美文网首页
ParallelStream源码解读

ParallelStream源码解读

作者: firefly_ | 来源:发表于2019-07-11 15:03 被阅读0次

    先看一下源码

    AbstractPipeline.class

        /**
         * Evaluate the pipeline with a terminal operation to produce a result.
         * 使用终端操作 terminalOp 对此流管道进行处理,处理过程中会从后往前链接形成流水线
         *
         * @param <R> the type of result
         * @param terminalOp the terminal operation to be applied to the pipeline.
         * @return the result
         */
        final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
            // 此阶段的输出类型==终端操作的输入类型
            assert getOutputShape() == terminalOp.inputShape();
            // 不允许重复消费        
            if (linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            // 设置已消费标识
            linkedOrConsumed = true;
            // 使用终端操作并行或串行处理此流管道
            return isParallel()
                   ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
                   : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
        }
    
         /**
         *  获取此阶段的源分割迭代器【数据源】
         */
        @SuppressWarnings("unchecked")
        private Spliterator<?> sourceSpliterator(int terminalFlags) {
            Spliterator<?> spliterator = null;
            // 1)源分割迭代器不为 null
            if (sourceStage.sourceSpliterator != null) {
                // 读取
                spliterator = sourceStage.sourceSpliterator;
                // 使用后置空
                sourceStage.sourceSpliterator = null;
            }
            // 2)分割迭代器通过 sourceSupplier 进行生成
            else if (sourceStage.sourceSupplier != null) {
                spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
                sourceStage.sourceSupplier = null;
            }
            else {
                throw new IllegalStateException(MSG_CONSUMED);
            }
    
            // 此流是并行的 && 流管道中存在有状态操作
            if (isParallel() && sourceStage.sourceAnyStateful) {
                // Adapt the source spliterator, evaluating each stateful op in the pipeline up to and including this pipeline stage.
                // The depth and flags of each pipeline stage are adjusted accordingly.
                int depth = 1;
                /**
                 * 从源阶段开始处理,一直处理到当前阶段为止
                 */
                for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
                        u != e;
                        u = p, p = p.nextStage) {
    
                    int thisOpFlags = p.sourceOrOpFlags;
                    // 当前处理阶段是有状态操作
                    if (p.opIsStateful()) {
                        depth = 0;
                        // 当前操作是短路操作
                        if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
                            thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
                        }
    
                        spliterator = p.opEvaluateParallelLazy(u, spliterator);
    
                        // Inject or clear SIZED on the source pipeline stage based on the stage's spliterator
                        thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
                                ? thisOpFlags & ~StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SIZED
                                        : thisOpFlags & ~StreamOpFlag.IS_SIZED | StreamOpFlag.NOT_SIZED;
                    }
                    p.depth = depth++;
                    p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
                }
            }
    
            // 终端操作带有标识位
            if (terminalFlags != 0)  {
                // 将终端操作的标志位合并到最后一阶段中
                combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
            }
    
            return spliterator;
        }
    

    并行流处理
    通过调用终端 sink 的 evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator)方法完成并行计算任务的创建和执行。

    并行计算逻辑-图片来自 (https://www.jianshu.com/p/cf6ce7e14dcc)

    再来看下stream包下面的AbstractTask.class是如何进行分割并处理任务的

    AbstractTask类图
    /**
         * Decides whether or not to split a task further or compute it
         * directly. If computing directly, calls {@code doLeaf} and pass
         * the result to {@code setRawResult}. Otherwise splits off
         * subtasks, forking one and continuing as the other.
         *
         * <p> The method is structured to conserve resources across a
         * range of uses.  The loop continues with one of the child tasks
         * when split, to avoid deep recursion. To cope with spliterators
         * that may be systematically biased toward left-heavy or
         * right-heavy splits, we alternate which child is forked versus
         * continued in the loop.
         */
        @Override
        public void compute() {
            Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
            // 读取分割迭代器的估计总元素个数
            long sizeEstimate = rs.estimateSize();
            // 读取单个任务的元素上限
            final long sizeThreshold = getTargetSize(sizeEstimate);
            boolean forkRight = false;
            @SuppressWarnings("unchecked") K task = (K) this;
            // 估计总元素数 > 阈值 && 尝试对 spliterator 进行二分
            while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
                K leftChild, rightChild, taskToFork;
                // 基于leftSpliterator 创建子任务并写入 leftChild
                task.leftChild  = leftChild = task.makeChild(ls);
                // 基于 rightSpliterator 创建子任务并写入 leftChild
                task.rightChild = rightChild = task.makeChild(rs);
                // 设置 task 的待完成任务数为 1【将会有一个子任务被 fork 进线程池中并行处理】
                task.setPendingCount(1);
                // 是否 forkRight【第一次 forkRight,接着 forkLeft,轮流交替】
                if (forkRight) {
                    forkRight = false;
                    // 更新待分割的 spliterator
                    rs = ls;
                    // 待处理的任务
                    task = leftChild;
                    // 待 fork 进线程池并行处理的任务
                    taskToFork = rightChild;
                }
                else {
                    forkRight = true;
                    // 待处理的任务
                    task = rightChild;
                    // 待 fork 进线程池并行处理的任务
                    taskToFork = leftChild;
                }
                taskToFork.fork();
                // 读取待分割 spliterator 的估计总元素个数
                sizeEstimate = rs.estimateSize();
            }
            // 此任务已经是叶子任务,则执行计算逻辑
            task.setLocalResult(task.doLeaf());
            // 此任务计算完毕后尝试完成主任务
            task.tryComplete();
        }
    
    Fork/Join流程

    相关文章

      网友评论

          本文标题:ParallelStream源码解读

          本文链接:https://www.haomeiwen.com/subject/ordyfctx.html