美文网首页故里学Javaspring
Stream流如何提高遍历集合效率

Stream流如何提高遍历集合效率

作者: 故里学Java | 来源:发表于2020-10-26 00:12 被阅读0次

    Stream是Java8的新特性,相当于是高级版的Iterator,可以通过Lambda表达式对集合进行各种非常便利、高效的聚合操作,或者大批量数据操作。Stream的聚合操作与数据库SQL的聚合操作类似。我们可以在应用层就可以实现类似数据库的聚合操作,在数据处理方面,Stream不仅支持串行的方式,还支持并行的方式,在大批量数据的情况下使用并行操作可以显著的提高效率。

    先Stream的简洁与强大:

    举例:过滤分组一天中所有的销售订单中已支付未发货的订单,先用传统的for循环的方式来实现:

      HashMap<String, List<SaleOrder>> orderMap = new HashMap<String, List<SaleOrder>>();
            for (SaleOrder order : orderList) {
                if (order.getIsPay) {
                    if (order.get(order.getDeliver()) == null) {  //该发货状态还没分类
                        List<SaleOrder> list = new ArryList<SaleOrder>();
                        list.add(order);
                        orderMap.put(order.getDeliver(),list);
                    }else {
                        orderMap.get(order.getDeliver()).add(order);
                    }
                }
            }
    

    我们在使用Java8中的StreamAPI实现:

    1. 串行实现
    HashMap<String, List<SaleOrder>> orderMap = orderList.stream().filter((SaleOrder order) -> order.getIsPay).collect(Collectors.groupingBy(SaleOrder::getDeliver));
    
    1. 并行实现
    HashMap<String, List<SaleOrder>> orderMap = orderList.parallelStream().filter((SaleOrder order) -> order.getIsPay).collect(Collectors.groupingBy(SaleOrder::getDeliver));
    

    通过简单的例子,我们可以看到Stream结合Lambda表达式实现的遍历筛选功能非常的简洁。

    使用Stream实现遍历非常简单,但是如何让遍历的效率更高,我们还需要透过源码看Stream的实现原理。

    Stream介绍

    通过官方文档我们可以了解到,Stream操作可以分为两大类:中间操作和终结操作。中间操作只对操作进行记录,只会返回一个流。不进行计算操作,而终结操作是实现了计算操作的。

    中间操作又可以分为无状态(Stateless)操作(比如:filter、map、flatMap等)与有状态(Stateful)操作(如:distinct、sorted、limit),无状态操作是指元素的处理不受之前元素的影响,有状态是指操作只有拿到所有元素之后才能继续下去。

    终结操作包含了非短路操作(short-circuiting)(如:forEach、reduce、collect)与短路操作(如:findFirst、findAny)。短路终结操作指的是不用处理所有元素才能返回结果,比如findFirst,只要找到第一个符合条件的元素就返回结果。非短路终结操作则必须处理完所有的元素才能返回。

    Stream源码实现

    先通过一个类图了解Stream是由哪些结构类组成的。

    BaseStreamStream是顶级的接口类,BaseStream主要定义了流的基本接口方法,如:spliterator、isParallel等;Stream则定义了一些流的常用操作方法,如:map、filter等。

    ReferencePipeline是描述中间操作管道流和源管道流的一个结构类,它通过定义内部类组装了各种操作流。它定义了Head、StatelessOp、StatefulOp三个内部类,实现了BaseStream和Stream的接口方法。

    这里其实还有一个很重要的接口就是Sink接口,定义了每个Stream之间关系的协议,包含begin()、end()、cancellationRequested()、accpt()四个方法。ReferencePipeline最终将整个Stream流操作组装成一个调用链,而这条调用链上的各个Stream操作的上下级关系是通过Sink接口协议来定义实现的。

    Stream操作叠加

    我们都知道,一个Stream的各个操作都是由处理管道组装,并统一完成数据处理的,在JDK中每次的中断操作会以使用阶段(Stage)命名。

    管道结构是由ReferencePipeline类实现,前面已经说了它有三个内部类。Head类主要用来定义数据源操作,在我们初次调用names.stream()方法时,会初次加载Head对象,此时位加载数据源操作;接着加载的是中间操作,分别为无状态中间操作StatelessOp对象和有状态操作对象StatefulOp对象,此时的Stage并没有执行,而是通过AbstractPipeline生成了一个中间操作Stage链表;当我们调用中间操作的时候会生成最后一个Stage,通过最后一个终结Stage触发之前的中间操作,从最后一个Stage开始递归产生一个Sink链,就像这样:

    下边通过一个例子实践一下:

    需求:在一个集合中查出一个长度最长的并且以张为姓氏的名字。

    List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");
    
    String maxLenStartWithZ = names.stream()
                      .filter(name -> name.startsWith("张"))
                      .mapToInt(String::length)
                      .max()
                      .toString();
    

    第一步:因为names是一个ArrayList集合,使用names.stream()方法将会调用集合类基础接口Collection的Stream方法;

        default Stream<E> stream() {
            return StreamSupport.stream(spliterator(), false);
        }
    

    第二步:Stream方法就会调用StreamSupport类的Stream方法,方法中初始化了ReferencePipeline的Head内部类对象:

    
        public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
            Objects.requireNonNull(spliterator);
            return new ReferencePipeline.Head<>(spliterator,
                                                StreamOpFlag.fromCharacteristics(spliterator),
                                                parallel);
        }
    

    第三步:调用filter和map方法,这两个方法是无状态的中间操作,所以执行filter和map操作时,并没有进行任何的操作,而是分别创建了一个Stage来标识每一步操作。

    通常情况下Stream的操作又需要一个回调函数,所以一个完整的Stage是由数据来源、操作、回调函数组成的三元组来表示。ReferencePipeline的filter和map方法:

        @Override
        public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
            Objects.requireNonNull(predicate);
            return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SIZED) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                    return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                        @Override
                        public void begin(long size) {
                            downstream.begin(-1);
                        }
    
                        @Override
                        public void accept(P_OUT u) {
                            if (predicate.test(u))
                                downstream.accept(u);
                        }
                    };
                }
            };
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            Objects.requireNonNull(mapper);
            return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<P_OUT, R>(sink) {
                        @Override
                        public void accept(P_OUT u) {
                            downstream.accept(mapper.apply(u));
                        }
                    };
                }
            };
        }
    

    new StatelessOp将调用父类AbstractPipeline的构造函数,这个构造函数将前后的Stage链接起来,生成一个Stage链表。

        AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
            if (previousStage.linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            previousStage.linkedOrConsumed = true;
            previousStage.nextStage = this;//将当前的stage的next指针指向之前的stage
    
            this.previousStage = previousStage;//赋值当前stage当全局变量previousStage 
            this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
            this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
            this.sourceStage = previousStage.sourceStage;
            if (opIsStateful())
                sourceStage.sourceAnyStateful = true;
            this.depth = previousStage.depth + 1;
        }
    

    因为在创建每一个Stage的时候都会包含一个opWrapSink()方法,该方法会把一个操作的具体实现封装到Sink类中,Sink采用(处理->转发)的模式来叠加操作。

    当执行到max()时,会调用ReferencePipeline的max方法,此时由于max方法是一个终结操作,所以会创建一个TerminalOp操作,同时创建一个ReducingSink,并且将操作封装在Sink类中。

    最后,调用AbstractPipeline的wrapSink方法,该方法会调用opWrapSink生成一个Sink链表,Sink链表中的每一个Sink都封装了一个操作的具体实现。

        @Override
        @SuppressWarnings("unchecked")
        final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
            Objects.requireNonNull(sink);
    
            for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
                sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
            }
            return (Sink<P_IN>) sink;
        }
    
    

    在Sink链表生成以后,Stream开始执行,通过spliterator迭代集合,执行Sink链表的具体操作。

        @Override
        final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
            Objects.requireNonNull(wrappedSink);
    
            if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
                wrappedSink.begin(spliterator.getExactSizeIfKnown());
                spliterator.forEachRemaining(wrappedSink);
                wrappedSink.end();
            }
            else {
                copyIntoWithCancel(wrappedSink, spliterator);
            }
        }
    

    Java8 中的 Spliterator 的 forEachRemaining 会迭代集合,每迭代一次,都会执行一次 filter 操作,如果 filter 操作通过,就会触发 map 操作,然后将结果放入到临时数组 object 中,再进行下一次的迭代。完成中间操作后,就会触发终结操作 max

    Stream并行处理

    还是刚刚的demo需求,只需稍作修改就可以变成并行处理

    List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");
    
    String maxLenStartWithZ = names.stream()
                        .parallel()
                      .filter(name -> name.startsWith("张"))
                      .mapToInt(String::length)
                      .max()
                      .toString();
    

    在执行终结操作之前都跟串行操作的一样,主要的不同就是在调用终结方法之后,并行处理会调用TerminalOp的evaluateParallel方法进行并行处理;

        final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
            assert getOutputShape() == terminalOp.inputShape();
            if (linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            linkedOrConsumed = true;
    
            return isParallel()
                   ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
                   : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
        }
    

    这里的并行处理指的是,Stream 结合了 ForkJoin 框架,对 Stream 处理进行了分片,Splititerator 中的 estimateSize 方法会估算出分片的数据量。通过预估的数据量获取最小处理单元的阈值,如果当前分片大小大于最小处理单元的阈值,就继续切分集合。每个分片将会生成一个 Sink 链表,当所有的分片操作完成后,ForkJoin 框架将会合并分片任何结果集。

    合理使用Stream

    综上,对Stream有了一定的认识,但是实际使用中我们该如何选择,还是需要一个测试说明:

    • 多核 CPU 服务器配置环境下,对比长度 100 的 int 数组的性能;

    • 多核 CPU 服务器配置环境下,对比长度 1.00E+8 的 int 数组的性能;

    • 多核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能;

    • 单核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能。

    多次测试结果:

    • 常规的迭代 <Stream 并行迭代 <Stream 串行迭代

    • Stream 并行迭代 < 常规的迭代 <Stream 串行迭代

    • Stream 并行迭代 < 常规的迭代 <Stream 串行迭代

    • 常规的迭代 <Stream 串行迭代 <Stream 并行迭代

    以上测试结果,我们可以看到:在循环迭代次数较少的情况下,常规的迭代方式性能反而更好;在单核 CPU 服务器配置环境中,也是常规迭代方式更有优势;而在大数据循环迭代中,如果服务器是多核 CPU 的情况下,Stream 的并行迭代优势明显。所以我们在平时处理大数据的集合时,应该尽量考虑将应用部署在多核 CPU 环境下,并且使用 Stream 的并行迭代方式进行处理。

    具体使用哪种方式我们还是需要结合应用场景进行选择。

    相关文章

      网友评论

        本文标题:Stream流如何提高遍历集合效率

        本文链接:https://www.haomeiwen.com/subject/jiuomktx.html