Stream原理研究

作者: 李不言被占用了 | 来源:发表于2019-11-16 22:02 被阅读0次

    几个概念

    需要了解StreamPipelineStageSink几个概念。

    Stream

    Stream就是一系列元素。是怎样的一系列元素呢?是支持顺序和并行聚集操作的一系列元素。
    Stream定义了一些中间操作(Intermediate operations)和结束操作(Terminal operations),
    中间操作包括无状态(Stateless)操作比如:filter, map, flatMap等,有状态(Stateful)操作比如:distinct, sorted, limit等;
    结束操作(Terminal operations)包括非短路操作(short-circuiting)比如:forEach, reduce, collect等和短路操作如:findFirst, findAny

    这些操作需要被按顺序记录下来,这就需要Pipeline了。

    Pipeline

    Pipeline就是管道的概念。
    管道有一个基类PipelineHelper,他是执行Stream管道的一个helper,将Stream的所有信息收集到一个地方。

    上面所说的操作其实都定义在PipelineHelper的一个子类ReferencePipeline中,包括Head(Source stage of a ReferencePipeline)、StatelessOp(Base class for a stateless intermediate stage of a Stream.)、StatefulOp(Base class for a stateful intermediate stage of a Stream.)静态内部类。

    Stage

    Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。

    简单理解就是每一个操作就会产生一个stage

    image.png

    Sink

    现在有了Stage了,如何串联起这些stage呢?通过Sink
    java.util.stream.Sink.ChainedReference来看,里面有一个downstream的实例成员,只要上游的sink调用下游的accept()方法,即可保证串联起来执行:即每一个上游的sink执行完自己的accept()逻辑以后,调用下游sinkaccept()方法(downstream.accept())。

    通过stage构建起来的双向链表只是给所有stage关联在一起,每个stage都知道自己前后有没有stage存在,却没法知道后面stage到底执行了哪种操作,以及回调函数是哪种形式。这就好像一个网络环境,根据iptables我知道了我下游的路由器在哪里,那我怎么把数据传递给它呢?通过协议,TCP协议!这么看的话,Sink更像是一个协议,让每一个stage执行完以后,知道调用某个方法来将数据传递给下一个stage,也就是Sink#accept()

    类图1

    通过代码来讲讲:

    @Test
    public void testFilter() {
        Stream.of(1, 2, 3, 4, 5)
                .filter(item -> item > 3)
                .forEach(System.out::println);// 打印结果:4,5
    }
    

    其实这段代码等价于:

    @Test
    public void testFilter() {
        Stream<Integer> head = Stream.of(1, 2, 3, 4, 5);
        Stream<Integer> filterStream = head.filter(i -> i > 3);
        filterStream.forEach(System.out::println);
        filterStream.count();
    }
    

    可以看到,前面每一步都会返回一个新的Stream,严格的说,其实是Stream的子类,更严格说,是ReferencePipeline的子类,参见上图类图1。具体如下:

    // java.util.Arrays#stream(T[], int, int)
    // 创建一个Stream实例
    public static <T> Stream<T> stream(T[] array, int startInclusive, int endExclusive) {
        //  spliterator方法创建一个ArraySpliterator,后面会关注到ArraySpliterator#forEachRemaining
        return StreamSupport.stream(spliterator(array, startInclusive, endExclusive), false);
    }
    
    // java.util.stream.StreamSupport#stream(java.util.Spliterator<T>, boolean)
    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        // 创建一个管道(pipeline)的头节点
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
    

    通过上面的代码可以看到,真正返回的是ReferencePipeline.Head的实例,当然,就是Stream的子类。
    接着看filter()方法:

    // java.util.stream.ReferencePipeline#filter
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        // filter是一个无状态操作(StatelessOp),StatelessOp是管道的一个节点,也是Stream的子类
        // 这段代码重点关注返回的StatelessOp实例,this是当前Stream实例,即上面代码Stream#of方法产生的实例,也就是ReferencePipeline.Head实例
        // 通过这个方法串联起新建节点StatelessOp与ReferencePipeline.Head的关系,即StatelessOp#previousStage=ReferencePipeline.Head
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            // 这个方法先不关注,后面还会说到
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }
    
                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }
    

    filter方法的默认实现是在ReferencePipeline中实现的。上面这段代码我们需要重点关注的是,返回了一个StatelessOp对象,也是ReferencePipeline的子类。关注一下构造StatelessOp的过程:

    // java.util.stream.ReferencePipeline.StatelessOp#StatelessOp
    StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
                StreamShape inputShape,
                int opFlags) {
        super(upstream, opFlags);
        assert upstream.getOutputShape() == inputShape;
    }
    
    // java.util.stream.ReferencePipeline#ReferencePipeline(java.util.stream.AbstractPipeline<?,P_IN,?>, int)
    ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
        super(upstream, opFlags);
    }
    
    // java.util.stream.AbstractPipeline#AbstractPipeline(java.util.stream.AbstractPipeline<?,E_IN,?>, int)
    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        if (previousStage.linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        previousStage.linkedOrConsumed = true;
        // 这里是关联stage
        previousStage.nextStage = this;
        this.previousStage = previousStage;
        
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        this.sourceStage = previousStage.sourceStage;// sourceStage是ReferencePipeline.Head
        if (opIsStateful())
            sourceStage.sourceAnyStateful = true;
        this.depth = previousStage.depth + 1;
    }
    

    通过上面的操作,形成了这样的双向链表:
    ReferencePipeline.Head <-> StatelessOp
    一般后面在有其他操作,例如map()sort()limit()等,都是返回一个ReferencePipeline实例(StatefulOp/StatelessOp),在构造方法里面将双向链表串联起来。

    最后来看forEach()方法:

    // java.util.stream.ReferencePipeline#forEach
    @Override
    public void forEach(Consumer<? super P_OUT> action) {
        // 通过工厂ForEachOps返回一个ForEachOp实例(下面会提到),本质是一个TerminalOp,即:构造了一个终止操作实例
        evaluate(ForEachOps.makeRef(action, false));
    }
    

    evaluate()方法就是一个执行Stream的操作。

    // java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
    
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               // 关注顺序执行这里
               // this是由filter方法返回的Stream,严格说是PipelineHelper,terminalOp是上面提到的ForEachOp实例
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
    

    现在我们只关注顺序执行这一段:

    // java.util.stream.ForEachOps.ForEachOp#evaluateSequential
    @Override
    public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                       Spliterator<S> spliterator) {
        // this是上面提到的ForEachOp实例,helper是filter方法返回的PipelineHelper
        return helper.wrapAndCopyInto(this, spliterator).get();
    }
    

    wrapAndCopyInto方法主要干了两件事:

    1. 封装sink链;
    2. 将封装好的sink链从头到尾执行。
    // 串联sink并执行
    // 前面提到反向串联,这里是顺序串联
    // 这个是helper的实例方法,也就是filter()方法构造出来的实例对象
    // java.util.stream.AbstractPipeline#wrapAndCopyInto
    @Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        // 这里的sink是上面提到的ForEachOp实例
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }
    

    先看看对sink链的封装:

    // java.util.stream.AbstractPipeline#wrapSink
    // sink是ForEachOp实例
    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        // 这里的sink是上面提到的ForEachOp实例
        Objects.requireNonNull(sink);
    
        // 不断的将p设置为前一个stage节点,然后调用p#opWrapSink构造出上一个stage对应的sink实例,这样p就知道了它的downstream是sink了
        // 初始化的时候,AbstractPipeline.this对应的是filter()方法返回的实例对象,sink是TerminalOp实例
        // p.depth > 0意味着sourceStage,即ReferencePipeline.Head实例不会包含到sink链中,因为sourceStage.depth = 0
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            // 关联好后,sink变成前一个stage
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;// 这个sink已经是sink链的链头了
    }
    
    image.png
    回看前面提到的filter()方法,重点关注opWrapSink()方法:
    // java.util.stream.ReferencePipeline#filter
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            // 重点关注这里
            // opWrapSink的作用是创建当前节点的Sink实例。
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                // 将当前的stage封装成Sink实例,实例的downstream就是参数sink
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }
    
                    @Override
                    public void accept(P_OUT u) {
                        // 这里先不关注,待会还要再提到
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }
    
    // java.util.stream.Sink.ChainedReference#ChainedReference
    // 构造方法,创建sink实例,同时告知downstream,这样当前sink实例和downstream实例就串联起来了
    public ChainedReference(Sink<? super E_OUT> downstream) {
        this.downstream = Objects.requireNonNull(downstream);
    }
    

    可以看到上面方法就是构造filter()方法这一步stage对应的Sink实例,并且串联起当前sink实例与downstream。这样在执行的时候就知道了,当前sink执行完了,就调用downstream的方法执行。
    构造好的sink链如下:

    image.png

    sink链构造好了,下面就是执行的过程:

    // java.util.stream.AbstractPipeline#copyInto
    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        // 此时的wrappedSink已经是链表头了
        Objects.requireNonNull(wrappedSink);
    
        // 没有短路操作,执行这里
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());// 执行Sink#begin
            // 这里的spliterator是ArraySpliterator,前面提到过
            // 对spliterator实例里面的元素逐个执行
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }
    
    // java.util.Spliterators.ArraySpliterator#forEachRemaining
    @SuppressWarnings("unchecked")
    @Override
    public void forEachRemaining(Consumer<? super T> action) {
        Object[] a; int i, hi; // hoist accesses and checks from loop
        if (action == null)
            throw new NullPointerException();
        if ((a = array).length >= (hi = fence) &&
            (i = index) >= 0 && i < (index = hi)) {
            // 对于串行执行的,就是对数组里的每个元素执行action.accept
            // 并行因为调用了trySplit,index和fence会不同,不是这里关注的重点。
            // 这里的action就是filter方法里的Sink.ChainedReference
            do { action.accept((T)a[i]); } while (++i < hi);
        }
    }
    

    上面的方法中调用了sink链的accept()方法,回看前面提到的filter()方法,重点关注accept()方法:

    // java.util.stream.ReferencePipeline#filter
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }
    
                    @Override
                    public void accept(P_OUT u) {
                        // 重点关注这里
                        if (predicate.test(u))
                            // 调用downstream的accept,就串联执行了
                            downstream.accept(u);
                    }
                };
            }
        };
    }
    
    // java.util.stream.ForEachOps.ForEachOp.OfRef#accept
    @Override
    public void accept(T t) {
        // comsumer就是forEach方法里的lambda表达式
        consumer.accept(t);
    }
    

    至此,一个完整的Stream流程执行完毕。
    附上Sink对应的部分类图:

    image.png

    这里XXXOps都是对应XXXOp的工厂,如ReduceOpsReduceOp的工厂,用于生产ReduceOp实例。

    总结

    总结一下:

    1. Stream是支持顺序和并行聚集操作的一系列元素。这里着重描述的是:Stream = 操作 + 元素
    2. Pipeline是管道,PipelineHelper是将Stream的信息收集到一个地方;
    3. Stage就是一个完整的操作,每一个PipelineHelper(或者说ReferencePipeline)的实例就是一个Stage,关联起所有Stage将形成一个双向链表。
    4. Sink链的主要作用是控制Stream中的每一个元素通过Stage形成的管道。
    5. Stream的逻辑是:
    • 构造stage双向链表
    • 利用stage双向链表逆序构造Sink
    • 通过Spliterator#forEachRemaining逐个处理元素,代码类似于:
    for (T element : allElements) {// 逐个元素处理
        Sink consumer = sinkHead;// sink链头
        While (consumer != null) {
            consumer.accept(element);// 消费元素
            consumer = consumer.downstream;// 传递给下游处理
        }
    }
    

    关于Stream带来好处的思考

    首先是编程思维的转变,原来是面向对象,现在是面向数据。你需要考虑如何通过对手里的数据进行操作得到预期的结果。这个有点想SQL
    其次是代码解决性、可读性,这个感觉更多是函数式编程带来的好处
    最后,也是最重要的,依托java.util.concurrent包,将并行化能力进行封装,也就是说,开发者可以通过最简单的方式实现并行化数据处理,不在需要控制线程(新建、启动、通信、异常处理)。

    一些参考

    深入理解Java Stream流水线
    Java 8 Stream探秘
    JAVA8中的stream原理解析——1(串行)
    java8 Stream的实现原理 (从零开始实现一个stream流)
    Stream源码分析
    jdk8中Spliterator的作用
    java8 Stream Pipelines 浅析

    Side-effect是只一个方法不只是返回一个结果,还会改变对象的状态。

    相关文章

      网友评论

        本文标题:Stream原理研究

        本文链接:https://www.haomeiwen.com/subject/ymbvbctx.html