美文网首页
系统优化专题5——Stream

系统优化专题5——Stream

作者: 我菠菜今天就是要为所欲为 | 来源:发表于2020-10-14 08:48 被阅读0次

    在Java8中,集合类Collection新增了两个流方法,分别是Stream()和ParallelStream()。

    什么是Stream

    在Java8之前,我们通总是通过for循环或者Iterator迭代来重新排序合并数据,又或者过重新定义Collections.sorts的Comparator方法来实现,这两种方式对于大数据量系统耒说,效率并不是很理想。

    Java8添加了一个新的口类Stream,他和我们之前接触的字节流概念不太一样,Java8集合的Stream相当于高级版的Iterator,他可以通过Lambda表达式对集合进行各种非便利、高效的聚合操作(AggregateOperation),或者大批量数据操作(Bulk Data Operation).

    Stream的聚合操作与数据库SQL的聚台操作sorted、filter、map等类似。我们在应用层就可以高效地实现类似数据库SQL的聚合操作了,而在数据作方面Stream不仅可以通过串行的方式实现数据操作,还可以通过并行的方式处理大批量数据,提高数据的处理效率。

    举例:

    过滤分组一所中学里身高在160cm以上的男女同学,传统的代码实现:

    Map<String,List<Student>> stuMap = new HashMap<>();
    for(Student stu:studentsList){
        if(stu.getHeight() > 160){//身高大于160cm
            if(stuMap.get(stu.getSex()) == null){//性别没分类
                List<Student> list = new ArrayList<>();
                list.add(stu);//把学生放进列表
                stuMap.put(stu.getSex(),list);//把表放入map
            } else {//该性别分类已存在
                stuMap.get(stu.getSex()).add(stu);
            }
        }
    }
    

    再看看Java8中的Stream API进行实现:

    //串行
    Map<String,List<Student>> stuMap = stuList.Stream().filter(Student s) -> s.getHeight() > 160).collect(Collectors.groupingBy(Student ::getSex));
    //并行
    Map<String,List<Student>> stuMap = stuList.parallelStream().filter(Student s) -> s.getHeight() > 160).collect(Collectors.groupingBy(Student ::getSex));
    

    可以看到,使用Stream结合Lambda表达式实现遍历筛选功能非常简单。

    Stream如何优化遍历

    Stream是如何做到优化迭代的?并行又是如何实现的?我们通过源码看一下Stream的实现原理。

    Stream操作分类

    Stream的操作分类其实是实现高效迭代大数据集合的重要原因之一。

    官方将Stream中的操作分为两大类:中间操作(Intermediate operations)和终结操作(Terminal operations)。中间操作只对操作进行了记景,即只会返回一个流,不会进行计算操作,而终结操作是实现了计算操作。

    中间操作又可以分为无状态(Stateless)与有状态(Stateful)操作,前者是指元素的处理不受之前元素的影响,后者是指该操作只有拿到所有元素之后才能继续下去。

    终结操作又可以分为短路(Short-circuiting)与非短路(Unshort-circuiting)操作,前者是指遇到某些符合条件的元素就可以得到最终结果,后者是指必须处理完所有元素才能得到最终结果。操作分类详情如下图所示:

    image.png

    我们通常还会将中间操作称为懒操作,也正是由这种懒操作结合终结操作、数据源构成的处理管道(Pipeline),实现了Stream的高效。

    Stream源码实现

    在了解Stream如何工作之前,我们先来了解下Stream包是由哪些主要结构类组合而成的,各个类的职责参照下图:

    image.png

    BaseStream和Stream为最顶端的接口类。

    BaseStream主要定义了流的基本接口方法,例如spliterator、isParallel等,Stream则定义了一些流的常用操作方法,例如,map、filter等。

    ReferencePipeline是一个结构类,他通过定义内部类组装了各种操作流。他定义了Head、StatelessOp、StatefulOp三个内部类,实现了BaseStream与Stream的接口方法。

    Sink接口是定义每个Stream操作之间关系的协议,他包含begin()、end()、
    cancellationRequested()、accpt()四个方法。

    ReferencePipeline最终会将整个Stream流操作组装成一个调用链,而调用链上的各个Stream操作的上下关系就是通过Sink接口协议来定义实现的。

    Stream操作叠加

    一个Stream的各个操作是由处理管道组装,并统一完成数据处理的。在JDK每次的中断操作会以使用阶段(Stage)命名。

    管道结构通常是由ReferencePipeline类实现的,前面提到过,ReferencePipeline包含了Head、StatelessOp、StatefulOp三种内部类。

    Head类主要来定义数据源操作,在我们初次调用xxx.stream()方法时,会初次加载Head对象,此时为加载数据源操作。

    接着加载的是中间操作,分别为无状态中间操作StatelessOp对象和有状态操作StatefulOp对象,此时的Stage并没有执行,而是通过AbstractPipeline生成了一个中间操作Stage链表。

    当我们调用终结操作时,会生成一个最终的Stage,通过这个个Stage触发之前的中间操作,从最后一个Stage开始,递归产生一个Sink链。

    如下图所示:

    image.png

    下面通过一个例子来感受一下Stream的操作分类是如何实现高效迭代大数据集合的。

    List<String> names = Arrays.asList("张三","李四","王老五","李三","刘老四","王小二","张四","张五六七");
    
    String maxLenStarWithZ = names.stream()
        .filter(name -> name.startsWith("张"))
        .mapToInt(String::length)
        .max()
        .toString();
    

    这个例子的操作是查出一个以张开头,长度最长的名字。从经验上,我们会认为是这样的操作流程:

    1. 首先遍历一次集合,得到以‘张’开头的所有名字;
    2. 遍历一次filter得到的集合,将名字转换成数字长度;
    3. 最后找到最长的名字并返回。

    但,实际情况并非这样,下面我们来逐步分析一下这个操作是如何执行的。

    首先,由于names是ArrayList集合,所以names.stream()方法将会调用集合类基础接口Collection的Stream方法:

    default Stream<E> stream(){
        return StreamSupport.stream(spliterator(),false);
    }
    

    然后,Stream方法会调用StreamSupport类的Stream方法,方法中初始化了一个ReferencePipeline的Head内部类对象:

    public static <T> Stream<T> (Spliterator<T> spliterator,boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,StreamOpFlag.fromCharacteristics(spliterator),parallel);
    }
    

    再调用filter和map方法,这两个方法都是无状态的中间操作,所以执行filter和map作时,并没有进行任何的操作,而是分别创建了一个Stage来标识总户的每一次操作。

    而通总情况下Stream的操作又要一个回调函数,所以一个完整的Stage是由数据来源、操作、回调函数组成的三元组来表示。如下图所示,分别是ReferencePipeline的filter方法和map方法:

    @Override
        public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
            Objects.requireNonNull(predicate);
            return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SIZED) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                    return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                        @Override
                        public void begin(long size) {
                            downstream.begin(-1);
                        }
    
                        @Override
                        public void accept(P_OUT u) {
                            if (predicate.test(u))
                                downstream.accept(u);
                        }
                    };
                }
            };
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            Objects.requireNonNull(mapper);
            return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<P_OUT, R>(sink) {
                        @Override
                        public void accept(P_OUT u) {
                            downstream.accept(mapper.apply(u));
                        }
                    };
                }
            };
        }
    

    newStatelessOp将令调用父类AbstractPipeline的构造函数,这个构造函数将前后的Stage联系起来,生成一个Stage链表:

        /**
         * Constructor for appending an intermediate operation stage onto an
         * existing pipeline.
         *
         * @param previousStage the upstream pipeline stage
         * @param opFlags the operation flags for the new stage, described in
         * {@link StreamOpFlag}
         */
        AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
            if (previousStage.linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            previousStage.linkedOrConsumed = true;
            previousStage.nextStage = this;
    
            this.previousStage = previousStage;
            this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
            this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
            this.sourceStage = previousStage.sourceStage;
            if (opIsStateful())
                sourceStage.sourceAnyStateful = true;
            this.depth = previousStage.depth + 1;
        }
    

    因为在创建每一个Stage时,都会包含一个opWrapSink()方法,该方法会把一个操作的具体实现封装在Sink类中,Sink采用(处理一>转发)的模式来叠加操作。

    当执行了max方法时,会调用ReferencePipeline的max方法,此时由于ma×方法是终结操作,所以会创建一个TerminalOp操作,同时创建一个ReducingSink,并且将操作封装在Sink类中。

    @Override
    public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
        return reduce(BinaryOperator.maxBy(comparator));
    }
    

    最调用AbstractPipeline的wrapSink方法,该方法会调用opWrapSink生成一一Sink链表,Sink链表中的每一个Sink都封装了一个操作的具体实现。

        @Override
        @SuppressWarnings("unchecked")
        final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
            Objects.requireNonNull(sink);
    
            for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
                sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
            }
            return (Sink<P_IN>) sink;
        }
    

    当Sink链表生成完成后Stream开执行,通过spliterator迭代集合,执行Sink链表中的具体操作。

    @Override
        @SuppressWarnings("unchecked")
        final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
            @SuppressWarnings({"rawtypes","unchecked"})
            AbstractPipeline p = AbstractPipeline.this;
            while (p.depth > 0) {
                p = p.previousStage;
            }
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            p.forEachWithCancel(spliterator, wrappedSink);
            wrappedSink.end();
        }
    

    Java8的Spliterator的forEachRemaining会迭代集合,每迭代一次,都会执行一次filter操作,如果filter作通过,就会触发map操作,然后将结果放入到临时数组object,再进行下一次的迭代。完成中间操作后,就会触发终结操作max。以上就是Stream的串行处理方式了。

    Stream并行处理

    要实现Stream的并行处理,我们只要在例子中的代码中新增一个Parallel()方法。

    List<String> names = Arrays.asList("张三","李四","王老五","李三","刘老四","王小二","张四","张五六七");
    
    String maxLenStarWithZ = names.stream()
        .parallel()
        .filter(name -> name.startsWith("张"))
        .mapToInt(String::length)
        .max()
        .toString();
    

    Stream的并行处理在执行终结操作之前,跟串行处理的实现是一样的。而在调用终结方法之后,实现的方式就有点不太一样,会调用TerminalOp的evaluateParallel方法进行并行处理。

        /**
         * Evaluate the pipeline with a terminal operation to produce a result.
         *
         * @param <R> the type of result
         * @param terminalOp the terminal operation to be applied to the pipeline.
         * @return the result
         */
        final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
            assert getOutputShape() == terminalOp.inputShape();
            if (linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            linkedOrConsumed = true;
    
            return isParallel()
                   ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
                   : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
        }
    

    送里的并行处理指的是Stream结合了ForkJoin框架对Stream处理进行了分片,Splititerator中的estimateSize方法会估算出分片的数据量。

    ForkJoin框架和估算算法,可以深入源码分析下该算法的实现。

    通过预估的数据量获取最小处理单元的值,如果当前分片大小大于最小处理单元的值,就继续切分集合每个分片将会生成一个Sink链表,当所有的分片操作完成后ForkJoin框架将会合并分片任何结果集。

    合理使用Stream

    StreamAPI用起来简洁,还能并行处理,我们将对常规的迭代、Stream串行迭代以及Stream并行迭代进行性能测试对比,迭代循环中,我们将对数据进行过滤分组等操作。分别进行以下几组测试:

    测试环境 数据长度 数据类型 测试结果(ms)
    多核CPU 100 int数组 常规<Stream并行<Stream串行
    多核CPU 1.00E+8 int数组 Stream并行<常规<Stream串行
    多核CPU 1.00E+8 对象数组 Stream并行<常规<Stream串行
    单核CPU 1.00E+8 对象数组 常规<Stream串行<Stream并行

    从上述测试可以看出,在多核大批量数据的情况下,Stream并行操作的性能要好一些,盲目使用Stream未必可以使系统性能更佳,还是要结合具体场景进行选择。

    总结

    纵观Stream的设计实现,非常值得我们学习。从大的设计方向上来说,Stream将整个操作分解为了链式结构,不仅简化了遍历操作,还为实现了并行计算打下了基础。

    从小的分类方向上来说,Stream将遍历元素的操作和对元素的计算分为中间操作和终结操作,而中间操作又根据元素之间状态有无干扰分为有状态和无状态操作,实现了链结构中的不同阶段。、

    在串行处理作中,Stream在执行每一步中间操作时,并不会做实际的数据操作处理,而是将这些中间操作串联起来最终由终结操作触发,生成一个数据处理链表通过Java8的Spliterator迭代器进行数据处理;此时,执行一次迭代,就对所有的无状态的中间操作进行数据处理,而对有状态的甲间操作,就要迭代处理完所有的数据,再进行处理作;最后就是进行终结操作的数据处理。

    在并行处理操作中,Stream对中间操作基本跟串行处理方式是一样的,但在终结操作中,Stream将结合ForkJoin框架对集合进行切片处理,ForkJoin框架将每个切片的处理结果Join合并起来。最后就是要注意Stream的使用场景。

    相关文章

      网友评论

          本文标题:系统优化专题5——Stream

          本文链接:https://www.haomeiwen.com/subject/phrwuktx.html