美文网首页故里学Javaspring
Stream流如何提高遍历集合效率

Stream流如何提高遍历集合效率

作者: 故里学Java | 来源:发表于2020-10-26 00:12 被阅读0次

Stream是Java8的新特性,相当于是高级版的Iterator,可以通过Lambda表达式对集合进行各种非常便利、高效的聚合操作,或者大批量数据操作。Stream的聚合操作与数据库SQL的聚合操作类似。我们可以在应用层就可以实现类似数据库的聚合操作,在数据处理方面,Stream不仅支持串行的方式,还支持并行的方式,在大批量数据的情况下使用并行操作可以显著的提高效率。

先Stream的简洁与强大:

举例:过滤分组一天中所有的销售订单中已支付未发货的订单,先用传统的for循环的方式来实现:

  HashMap<String, List<SaleOrder>> orderMap = new HashMap<String, List<SaleOrder>>();
        for (SaleOrder order : orderList) {
            if (order.getIsPay) {
                if (order.get(order.getDeliver()) == null) {  //该发货状态还没分类
                    List<SaleOrder> list = new ArryList<SaleOrder>();
                    list.add(order);
                    orderMap.put(order.getDeliver(),list);
                }else {
                    orderMap.get(order.getDeliver()).add(order);
                }
            }
        }

我们在使用Java8中的StreamAPI实现:

  1. 串行实现
HashMap<String, List<SaleOrder>> orderMap = orderList.stream().filter((SaleOrder order) -> order.getIsPay).collect(Collectors.groupingBy(SaleOrder::getDeliver));
  1. 并行实现
HashMap<String, List<SaleOrder>> orderMap = orderList.parallelStream().filter((SaleOrder order) -> order.getIsPay).collect(Collectors.groupingBy(SaleOrder::getDeliver));

通过简单的例子,我们可以看到Stream结合Lambda表达式实现的遍历筛选功能非常的简洁。

使用Stream实现遍历非常简单,但是如何让遍历的效率更高,我们还需要透过源码看Stream的实现原理。

Stream介绍

通过官方文档我们可以了解到,Stream操作可以分为两大类:中间操作和终结操作。中间操作只对操作进行记录,只会返回一个流。不进行计算操作,而终结操作是实现了计算操作的。

中间操作又可以分为无状态(Stateless)操作(比如:filter、map、flatMap等)与有状态(Stateful)操作(如:distinct、sorted、limit),无状态操作是指元素的处理不受之前元素的影响,有状态是指操作只有拿到所有元素之后才能继续下去。

终结操作包含了非短路操作(short-circuiting)(如:forEach、reduce、collect)与短路操作(如:findFirst、findAny)。短路终结操作指的是不用处理所有元素才能返回结果,比如findFirst,只要找到第一个符合条件的元素就返回结果。非短路终结操作则必须处理完所有的元素才能返回。

Stream源码实现

先通过一个类图了解Stream是由哪些结构类组成的。

BaseStreamStream是顶级的接口类,BaseStream主要定义了流的基本接口方法,如:spliterator、isParallel等;Stream则定义了一些流的常用操作方法,如:map、filter等。

ReferencePipeline是描述中间操作管道流和源管道流的一个结构类,它通过定义内部类组装了各种操作流。它定义了Head、StatelessOp、StatefulOp三个内部类,实现了BaseStream和Stream的接口方法。

这里其实还有一个很重要的接口就是Sink接口,定义了每个Stream之间关系的协议,包含begin()、end()、cancellationRequested()、accpt()四个方法。ReferencePipeline最终将整个Stream流操作组装成一个调用链,而这条调用链上的各个Stream操作的上下级关系是通过Sink接口协议来定义实现的。

Stream操作叠加

我们都知道,一个Stream的各个操作都是由处理管道组装,并统一完成数据处理的,在JDK中每次的中断操作会以使用阶段(Stage)命名。

管道结构是由ReferencePipeline类实现,前面已经说了它有三个内部类。Head类主要用来定义数据源操作,在我们初次调用names.stream()方法时,会初次加载Head对象,此时位加载数据源操作;接着加载的是中间操作,分别为无状态中间操作StatelessOp对象和有状态操作对象StatefulOp对象,此时的Stage并没有执行,而是通过AbstractPipeline生成了一个中间操作Stage链表;当我们调用中间操作的时候会生成最后一个Stage,通过最后一个终结Stage触发之前的中间操作,从最后一个Stage开始递归产生一个Sink链,就像这样:

下边通过一个例子实践一下:

需求:在一个集合中查出一个长度最长的并且以张为姓氏的名字。

List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");

String maxLenStartWithZ = names.stream()
                  .filter(name -> name.startsWith("张"))
                  .mapToInt(String::length)
                  .max()
                  .toString();

第一步:因为names是一个ArrayList集合,使用names.stream()方法将会调用集合类基础接口Collection的Stream方法;

    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

第二步:Stream方法就会调用StreamSupport类的Stream方法,方法中初始化了ReferencePipeline的Head内部类对象:


    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

第三步:调用filter和map方法,这两个方法是无状态的中间操作,所以执行filter和map操作时,并没有进行任何的操作,而是分别创建了一个Stage来标识每一步操作。

通常情况下Stream的操作又需要一个回调函数,所以一个完整的Stage是由数据来源、操作、回调函数组成的三元组来表示。ReferencePipeline的filter和map方法:

    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }
    @Override
    @SuppressWarnings("unchecked")
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

new StatelessOp将调用父类AbstractPipeline的构造函数,这个构造函数将前后的Stage链接起来,生成一个Stage链表。

    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        previousStage.nextStage = this;//将当前的stage的next指针指向之前的stage

        this.previousStage = previousStage;//赋值当前stage当全局变量previousStage 
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }

因为在创建每一个Stage的时候都会包含一个opWrapSink()方法,该方法会把一个操作的具体实现封装到Sink类中,Sink采用(处理->转发)的模式来叠加操作。

当执行到max()时,会调用ReferencePipeline的max方法,此时由于max方法是一个终结操作,所以会创建一个TerminalOp操作,同时创建一个ReducingSink,并且将操作封装在Sink类中。

最后,调用AbstractPipeline的wrapSink方法,该方法会调用opWrapSink生成一个Sink链表,Sink链表中的每一个Sink都封装了一个操作的具体实现。

    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

在Sink链表生成以后,Stream开始执行,通过spliterator迭代集合,执行Sink链表的具体操作。

    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);

        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

Java8 中的 Spliterator 的 forEachRemaining 会迭代集合,每迭代一次,都会执行一次 filter 操作,如果 filter 操作通过,就会触发 map 操作,然后将结果放入到临时数组 object 中,再进行下一次的迭代。完成中间操作后,就会触发终结操作 max

Stream并行处理

还是刚刚的demo需求,只需稍作修改就可以变成并行处理

List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");

String maxLenStartWithZ = names.stream()
                    .parallel()
                  .filter(name -> name.startsWith("张"))
                  .mapToInt(String::length)
                  .max()
                  .toString();

在执行终结操作之前都跟串行操作的一样,主要的不同就是在调用终结方法之后,并行处理会调用TerminalOp的evaluateParallel方法进行并行处理;

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

这里的并行处理指的是,Stream 结合了 ForkJoin 框架,对 Stream 处理进行了分片,Splititerator 中的 estimateSize 方法会估算出分片的数据量。通过预估的数据量获取最小处理单元的阈值,如果当前分片大小大于最小处理单元的阈值,就继续切分集合。每个分片将会生成一个 Sink 链表,当所有的分片操作完成后,ForkJoin 框架将会合并分片任何结果集。

合理使用Stream

综上,对Stream有了一定的认识,但是实际使用中我们该如何选择,还是需要一个测试说明:

  • 多核 CPU 服务器配置环境下,对比长度 100 的 int 数组的性能;

  • 多核 CPU 服务器配置环境下,对比长度 1.00E+8 的 int 数组的性能;

  • 多核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能;

  • 单核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能。

多次测试结果:

  • 常规的迭代 <Stream 并行迭代 <Stream 串行迭代

  • Stream 并行迭代 < 常规的迭代 <Stream 串行迭代

  • Stream 并行迭代 < 常规的迭代 <Stream 串行迭代

  • 常规的迭代 <Stream 串行迭代 <Stream 并行迭代

以上测试结果,我们可以看到:在循环迭代次数较少的情况下,常规的迭代方式性能反而更好;在单核 CPU 服务器配置环境中,也是常规迭代方式更有优势;而在大数据循环迭代中,如果服务器是多核 CPU 的情况下,Stream 的并行迭代优势明显。所以我们在平时处理大数据的集合时,应该尽量考虑将应用部署在多核 CPU 环境下,并且使用 Stream 的并行迭代方式进行处理。

具体使用哪种方式我们还是需要结合应用场景进行选择。

相关文章

  • Stream流如何提高遍历集合效率

    Stream是Java8的新特性,相当于是高级版的Iterator,可以通过Lambda表达式对集合进行各种非常便...

  • Stream如何提高遍历集合效率?

    什么是 Stream? 现在很多大数据量系统中都存在分表分库的情况。例如,电商系统中的订单表,常常使用用户 ID ...

  • JDK 1.8 新特性学习(Stream)

    Stream实现了对数据源的流式处理,它可以并行操作,提高数据处理效率。 什么是流 流不是集合,它不对数据做保存,...

  • 八 JDK8新特性——第二节 Stream流式思想

    1、使用传统的方式遍历集合 Stream 流有别于I/O流,是对集合和数组进行操作,解决集合数组的弊端 2、使用S...

  • Java流操作总结

    Java流(Stream)操作自Java 8引入,通过Stream操作可以简化代码编写,提高代码执行效率。流整体操...

  • Stream

    假设此时要对以下的集合进行各种操作,Stream就可以极大的提高我们的效率。 获取指定数量的流 通过设置的条件过滤...

  • Stream流

    流式思想 Stream流的简单尝试 传统for循环遍历的方法 Steam流的方式 获取stream流 stream...

  • Stream 使用这么久,它是如何提高遍历集合效率的?

    对于 List 集合类,我想大家肯定很了解了,那我想一定也知道集合的顶端接口 Collection。在 Java8...

  • Java8 Stream 基础

    1. 流(Stream) 流(Stream)中保存对集合或数组数据的操作。和集合类似,但集合中保存的是数据 流到底...

  • 2021-03-30什么是Stream

    流(Stream)中保存对集合或数组数据的操作。和集合类似,但集合中保存的是数据。Stream特点1.Stream...

网友评论

    本文标题:Stream流如何提高遍历集合效率

    本文链接:https://www.haomeiwen.com/subject/jiuomktx.html