Stream原理研究

作者: 李不言被占用了 | 来源:发表于2019-11-16 22:02 被阅读0次

几个概念

需要了解StreamPipelineStageSink几个概念。

Stream

Stream就是一系列元素。是怎样的一系列元素呢?是支持顺序和并行聚集操作的一系列元素。
Stream定义了一些中间操作(Intermediate operations)和结束操作(Terminal operations),
中间操作包括无状态(Stateless)操作比如:filter, map, flatMap等,有状态(Stateful)操作比如:distinct, sorted, limit等;
结束操作(Terminal operations)包括非短路操作(short-circuiting)比如:forEach, reduce, collect等和短路操作如:findFirst, findAny

这些操作需要被按顺序记录下来,这就需要Pipeline了。

Pipeline

Pipeline就是管道的概念。
管道有一个基类PipelineHelper,他是执行Stream管道的一个helper,将Stream的所有信息收集到一个地方。

上面所说的操作其实都定义在PipelineHelper的一个子类ReferencePipeline中,包括Head(Source stage of a ReferencePipeline)、StatelessOp(Base class for a stateless intermediate stage of a Stream.)、StatefulOp(Base class for a stateful intermediate stage of a Stream.)静态内部类。

Stage

Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。

简单理解就是每一个操作就会产生一个stage

image.png

Sink

现在有了Stage了,如何串联起这些stage呢?通过Sink
java.util.stream.Sink.ChainedReference来看,里面有一个downstream的实例成员,只要上游的sink调用下游的accept()方法,即可保证串联起来执行:即每一个上游的sink执行完自己的accept()逻辑以后,调用下游sinkaccept()方法(downstream.accept())。

通过stage构建起来的双向链表只是给所有stage关联在一起,每个stage都知道自己前后有没有stage存在,却没法知道后面stage到底执行了哪种操作,以及回调函数是哪种形式。这就好像一个网络环境,根据iptables我知道了我下游的路由器在哪里,那我怎么把数据传递给它呢?通过协议,TCP协议!这么看的话,Sink更像是一个协议,让每一个stage执行完以后,知道调用某个方法来将数据传递给下一个stage,也就是Sink#accept()

类图1

通过代码来讲讲:

@Test
public void testFilter() {
    Stream.of(1, 2, 3, 4, 5)
            .filter(item -> item > 3)
            .forEach(System.out::println);// 打印结果:4,5
}

其实这段代码等价于:

@Test
public void testFilter() {
    Stream<Integer> head = Stream.of(1, 2, 3, 4, 5);
    Stream<Integer> filterStream = head.filter(i -> i > 3);
    filterStream.forEach(System.out::println);
    filterStream.count();
}

可以看到,前面每一步都会返回一个新的Stream,严格的说,其实是Stream的子类,更严格说,是ReferencePipeline的子类,参见上图类图1。具体如下:

// java.util.Arrays#stream(T[], int, int)
// 创建一个Stream实例
public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
    //  spliterator方法创建一个ArraySpliterator,后面会关注到ArraySpliterator#forEachRemaining
    return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
}

// java.util.stream.StreamSupport#stream(java.util.Spliterator<T>, boolean)
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    // 创建一个管道(pipeline)的头节点
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}

通过上面的代码可以看到,真正返回的是ReferencePipeline.Head的实例,当然,就是Stream的子类。
接着看filter()方法:

// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    // filter是一个无状态操作(StatelessOp),StatelessOp是管道的一个节点,也是Stream的子类
    // 这段代码重点关注返回的StatelessOp实例,this是当前Stream实例,即上面代码Stream#of方法产生的实例,也就是ReferencePipeline.Head实例
    // 通过这个方法串联起新建节点StatelessOp与ReferencePipeline.Head的关系,即StatelessOp#previousStage=ReferencePipeline.Head
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        // 这个方法先不关注,后面还会说到
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

filter方法的默认实现是在ReferencePipeline中实现的。上面这段代码我们需要重点关注的是,返回了一个StatelessOp对象,也是ReferencePipeline的子类。关注一下构造StatelessOp的过程:

// java.util.stream.ReferencePipeline.StatelessOp#StatelessOp
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
            StreamShape inputShape,
            int opFlags) {
    super(upstream, opFlags);
    assert upstream.getOutputShape() == inputShape;
}

// java.util.stream.ReferencePipeline#ReferencePipeline(java.util.stream.AbstractPipeline<?,P_IN,?>, int)
ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
    super(upstream, opFlags);
}

// java.util.stream.AbstractPipeline#AbstractPipeline(java.util.stream.AbstractPipeline<?,E_IN,?>, int)
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    // 这里是关联stage
    previousStage.nextStage = this;
    this.previousStage = previousStage;
    
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;// sourceStage是ReferencePipeline.Head
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}

通过上面的操作,形成了这样的双向链表:
ReferencePipeline.Head <-> StatelessOp
一般后面在有其他操作,例如map()sort()limit()等,都是返回一个ReferencePipeline实例(StatefulOp/StatelessOp),在构造方法里面将双向链表串联起来。

最后来看forEach()方法:

// java.util.stream.ReferencePipeline#forEach
@Override
public void forEach(Consumer<? super P_OUT> action) {
    // 通过工厂ForEachOps返回一个ForEachOp实例(下面会提到),本质是一个TerminalOp,即:构造了一个终止操作实例
    evaluate(ForEachOps.makeRef(action, false));
}

evaluate()方法就是一个执行Stream的操作。

// java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           // 关注顺序执行这里
           // this是由filter方法返回的Stream,严格说是PipelineHelper,terminalOp是上面提到的ForEachOp实例
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

现在我们只关注顺序执行这一段:

// java.util.stream.ForEachOps.ForEachOp#evaluateSequential
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<S> spliterator) {
    // this是上面提到的ForEachOp实例,helper是filter方法返回的PipelineHelper
    return helper.wrapAndCopyInto(this, spliterator).get();
}

wrapAndCopyInto方法主要干了两件事:

  1. 封装sink链;
  2. 将封装好的sink链从头到尾执行。
// 串联sink并执行
// 前面提到反向串联,这里是顺序串联
// 这个是helper的实例方法,也就是filter()方法构造出来的实例对象
// java.util.stream.AbstractPipeline#wrapAndCopyInto
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    // 这里的sink是上面提到的ForEachOp实例
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

先看看对sink链的封装:

// java.util.stream.AbstractPipeline#wrapSink
// sink是ForEachOp实例
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    // 这里的sink是上面提到的ForEachOp实例
    Objects.requireNonNull(sink);

    // 不断的将p设置为前一个stage节点,然后调用p#opWrapSink构造出上一个stage对应的sink实例,这样p就知道了它的downstream是sink了
    // 初始化的时候,AbstractPipeline.this对应的是filter()方法返回的实例对象,sink是TerminalOp实例
    // p.depth > 0意味着sourceStage,即ReferencePipeline.Head实例不会包含到sink链中,因为sourceStage.depth = 0
    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        // 关联好后,sink变成前一个stage
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;// 这个sink已经是sink链的链头了
}
image.png
回看前面提到的filter()方法,重点关注opWrapSink()方法:
// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        // 重点关注这里
        // opWrapSink的作用是创建当前节点的Sink实例。
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            // 将当前的stage封装成Sink实例,实例的downstream就是参数sink
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    // 这里先不关注,待会还要再提到
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

// java.util.stream.Sink.ChainedReference#ChainedReference
// 构造方法,创建sink实例,同时告知downstream,这样当前sink实例和downstream实例就串联起来了
public ChainedReference(Sink<? super E_OUT> downstream) {
    this.downstream = Objects.requireNonNull(downstream);
}

可以看到上面方法就是构造filter()方法这一步stage对应的Sink实例,并且串联起当前sink实例与downstream。这样在执行的时候就知道了,当前sink执行完了,就调用downstream的方法执行。
构造好的sink链如下:

image.png

sink链构造好了,下面就是执行的过程:

// java.util.stream.AbstractPipeline#copyInto
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    // 此时的wrappedSink已经是链表头了
    Objects.requireNonNull(wrappedSink);

    // 没有短路操作,执行这里
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());// 执行Sink#begin
        // 这里的spliterator是ArraySpliterator,前面提到过
        // 对spliterator实例里面的元素逐个执行
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

// java.util.Spliterators.ArraySpliterator#forEachRemaining
@SuppressWarnings("unchecked")
@Override
public void forEachRemaining(Consumer<? super T> action) {
    Object[] a; int i, hi; // hoist accesses and checks from loop
    if (action == null)
        throw new NullPointerException();
    if ((a = array).length >= (hi = fence) &&
        (i = index) >= 0 && i < (index = hi)) {
        // 对于串行执行的,就是对数组里的每个元素执行action.accept
        // 并行因为调用了trySplit,index和fence会不同,不是这里关注的重点。
        // 这里的action就是filter方法里的Sink.ChainedReference
        do { action.accept((T)a[i]); } while (++i < hi);
    }
}

上面的方法中调用了sink链的accept()方法,回看前面提到的filter()方法,重点关注accept()方法:

// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    // 重点关注这里
                    if (predicate.test(u))
                        // 调用downstream的accept,就串联执行了
                        downstream.accept(u);
                }
            };
        }
    };
}

// java.util.stream.ForEachOps.ForEachOp.OfRef#accept
@Override
public void accept(T t) {
    // comsumer就是forEach方法里的lambda表达式
    consumer.accept(t);
}

至此,一个完整的Stream流程执行完毕。
附上Sink对应的部分类图:

image.png

这里XXXOps都是对应XXXOp的工厂,如ReduceOpsReduceOp的工厂,用于生产ReduceOp实例。

总结

总结一下:

  1. Stream是支持顺序和并行聚集操作的一系列元素。这里着重描述的是:Stream = 操作 + 元素
  2. Pipeline是管道,PipelineHelper是将Stream的信息收集到一个地方;
  3. Stage就是一个完整的操作,每一个PipelineHelper(或者说ReferencePipeline)的实例就是一个Stage,关联起所有Stage将形成一个双向链表。
  4. Sink链的主要作用是控制Stream中的每一个元素通过Stage形成的管道。
  5. Stream的逻辑是:
  • 构造stage双向链表
  • 利用stage双向链表逆序构造Sink
  • 通过Spliterator#forEachRemaining逐个处理元素,代码类似于:
for (T element : allElements) {// 逐个元素处理
    Sink consumer = sinkHead;// sink链头
    While (consumer != null) {
        consumer.accept(element);// 消费元素
        consumer = consumer.downstream;// 传递给下游处理
    }
}

关于Stream带来好处的思考

首先是编程思维的转变,原来是面向对象,现在是面向数据。你需要考虑如何通过对手里的数据进行操作得到预期的结果。这个有点想SQL
其次是代码解决性、可读性,这个感觉更多是函数式编程带来的好处
最后,也是最重要的,依托java.util.concurrent包,将并行化能力进行封装,也就是说,开发者可以通过最简单的方式实现并行化数据处理,不在需要控制线程(新建、启动、通信、异常处理)。

一些参考

深入理解Java Stream流水线
Java 8 Stream探秘
JAVA8中的stream原理解析——1(串行)
java8 Stream的实现原理 (从零开始实现一个stream流)
Stream源码分析
jdk8中Spliterator的作用
java8 Stream Pipelines 浅析

Side-effect是只一个方法不只是返回一个结果,还会改变对象的状态。

相关文章

  • Stream原理研究

    几个概念 需要了解Stream、Pipeline、Stage、Sink几个概念。 Stream Stream就是一...

  • JAVA8新特性: Stream-集合流操作

    Stream类全路径为:java.util.stream.Stream Stream简介 Stream原理 Str...

  • Java Stream 源码分析

    前言 Java 8 的 Stream 使得代码更加简洁易懂,本篇文章深入分析 Java Stream 的工作原理,...

  • Java Stream 源码分析

    前言 Java 8 的 Stream 使得代码更加简洁易懂,本篇文章深入分析 Java Stream 的工作原理,...

  • Java-2

    流 Stream stream深入解析- 最详细的幕后原理 几个关键概念 流来源有一种称为 Spliterator...

  • Java8 Stream 语法详解 & 用法实例

    本文将会详细讲解Stream的使用方法(不会涉及Stream的原理,因为这个系列的文章还是一个快速学习如何使用的)...

  • java1.8 stream源码解析(串行stream)

    常用的流操作 在深入原理之前,我们有必要知道关于Stream的一些基础知识,关于Stream的操作分类,如表1-1...

  • 2021-Flutter

    1、flutter与原生通信,platform channel原理? 2、future 和 stream数据流 3...

  • Stream自动并行原理

    1.在哪里调用了ForkJoinPool? 1.1 parallelStream示例 1.2 ReduceTask...

  • 2022-04-15

    如何学习: 理解概念的能力 研究概念的能力 理解原理的能力 研究原理的能力(原理背后是什么) 审题解题的能力 研究...

网友评论

    本文标题:Stream原理研究

    本文链接:https://www.haomeiwen.com/subject/ymbvbctx.html