美文网首页Java并发
Stream自动并行原理

Stream自动并行原理

作者: 王侦 | 来源:发表于2019-07-25 10:34 被阅读0次

    1.在哪里调用了ForkJoinPool?

    1.1 parallelStream示例

    int result = Stream.of(1,2,3,4,5).parallel().map(x -> x + 1).reduce((a, b) -> a + b).get();
    
        @SafeVarargs
        @SuppressWarnings("varargs") // Creating a stream from an array is safe
        public static<T> Stream<T> of(T... values) {
            return Arrays.stream(values);
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public final S parallel() {
            sourceStage.parallel = true;
            return (S) this;
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            Objects.requireNonNull(mapper);
            return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<P_OUT, R>(sink) {
                        @Override
                        public void accept(P_OUT u) {
                            downstream.accept(mapper.apply(u));
                        }
                    };
                }
            };
        }
    
        @Override
        public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
            return evaluate(ReduceOps.makeRef(accumulator));
        }
    
        /**
         * Evaluate the pipeline with a terminal operation to produce a result.
         *
         * @param <R> the type of result
         * @param terminalOp the terminal operation to be applied to the pipeline.
         * @return the result
         */
        final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
            assert getOutputShape() == terminalOp.inputShape();
            if (linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            linkedOrConsumed = true;
    
            return isParallel()
                   ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
                   : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
        }
    
    
            @Override
            public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                             Spliterator<P_IN> spliterator) {
                return new ReduceTask<>(this, helper, spliterator).invoke().get();
            }
    

    1.2 ReduceTask会调用invoke

            @Override
            public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                             Spliterator<P_IN> spliterator) {
                return new ReduceTask<>(this, helper, spliterator).invoke().get();
            }
    

    调用的是ForkJoinTask的invoke:

        public final V invoke() {
            int s;
            if ((s = doInvoke() & DONE_MASK) != NORMAL)
                reportException(s);
            return getRawResult();
        }
    
        private int doInvoke() {
            int s; Thread t; ForkJoinWorkerThread wt;
            return (s = doExec()) < 0 ? s :
                ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                (wt = (ForkJoinWorkerThread)t).pool.
                awaitJoin(wt.workQueue, this, 0L) :
                externalAwaitDone();
        }
    
        final int doExec() {
            int s; boolean completed;
            if ((s = status) >= 0) {
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    s = setCompletion(NORMAL);
            }
            return s;
        }
    

    调用CountedCompleter的exec():

        /**
         * Implements execution conventions for CountedCompleters.
         */
        protected final boolean exec() {
            compute();
            return false;
        }
    

    调用stream包下面的AbstractTask.compute:

        @Override
        public void compute() {
            Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
            long sizeEstimate = rs.estimateSize();
            long sizeThreshold = getTargetSize(sizeEstimate);
            boolean forkRight = false;
            @SuppressWarnings("unchecked") K task = (K) this;
            while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
                K leftChild, rightChild, taskToFork;
                task.leftChild  = leftChild = task.makeChild(ls);
                task.rightChild = rightChild = task.makeChild(rs);
                task.setPendingCount(1);
                if (forkRight) {
                    forkRight = false;
                    rs = ls;
                    task = leftChild;
                    taskToFork = rightChild;
                }
                else {
                    forkRight = true;
                    task = rightChild;
                    taskToFork = leftChild;
                }
                taskToFork.fork();
                sizeEstimate = rs.estimateSize();
            }
            task.setLocalResult(task.doLeaf());
            task.tryComplete();
        }
    

    调用ForkJoinTask的fork:

        public final ForkJoinTask<V> fork() {
            Thread t;
            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
                ((ForkJoinWorkerThread)t).workQueue.push(this);
            else
                ForkJoinPool.common.externalPush(this);
            return this;
        }
    

    2.并行的核心就是任务拆分

    2.1 调用stream包下面的AbstractTask.compute

        @Override
        public void compute() {
            Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
            long sizeEstimate = rs.estimateSize();
            long sizeThreshold = getTargetSize(sizeEstimate);
            boolean forkRight = false;
            @SuppressWarnings("unchecked") K task = (K) this;
            while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
                K leftChild, rightChild, taskToFork;
                task.leftChild  = leftChild = task.makeChild(ls);
                task.rightChild = rightChild = task.makeChild(rs);
                task.setPendingCount(1);
                if (forkRight) {
                    forkRight = false;
                    rs = ls;
                    task = leftChild;
                    taskToFork = rightChild;
                }
                else {
                    forkRight = true;
                    task = rightChild;
                    taskToFork = leftChild;
                }
                taskToFork.fork();
                sizeEstimate = rs.estimateSize();
            }
            task.setLocalResult(task.doLeaf());
            task.tryComplete();
        }
    

    2.2 ReduceTask.doLeaf()

        private static final class ReduceTask<P_IN, P_OUT, R,
                                              S extends AccumulatingSink<P_OUT, R, S>>
                extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
            private final ReduceOp<P_OUT, R, S> op;
    
            ReduceTask(ReduceOp<P_OUT, R, S> op,
                       PipelineHelper<P_OUT> helper,
                       Spliterator<P_IN> spliterator) {
                super(helper, spliterator);
                this.op = op;
            }
    
            ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
                       Spliterator<P_IN> spliterator) {
                super(parent, spliterator);
                this.op = parent.op;
            }
    
            @Override
            protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
                return new ReduceTask<>(this, spliterator);
            }
    
            @Override
            protected S doLeaf() {
                return helper.wrapAndCopyInto(op.makeSink(), spliterator);
            }
    
            @Override
            public void onCompletion(CountedCompleter<?> caller) {
                if (!isLeaf()) {
                    S leftResult = leftChild.getLocalResult();
                    leftResult.combine(rightChild.getLocalResult());
                    setLocalResult(leftResult);
                }
                // GC spliterator, left and right child
                super.onCompletion(caller);
            }
        }
    

    根据Stream流水线原理中可知ReduceTask.doLeaf()也即求值操作,针对这部分流进行分部分求值:

        @Override
        final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
            copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
            return sink;
        }
    

    2.3 ReduceTask.onCompletion()

    AbstractTask.compute最后调用tryComplete()。
    CountedCompleter.tryComplete():

        public final void tryComplete() {
            CountedCompleter<?> a = this, s = a;
            for (int c;;) {
                if ((c = a.pending) == 0) {
                    a.onCompletion(s);
                    if ((a = (s = a).completer) == null) {
                        s.quietlyComplete();
                        return;
                    }
                }
                else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
                    return;
            }
        }
    

    ReduceTask的onCompletion:

            @Override
            public void onCompletion(CountedCompleter<?> caller) {
                if (!isLeaf()) {
                    S leftResult = leftChild.getLocalResult();
                    leftResult.combine(rightChild.getLocalResult());
                    setLocalResult(leftResult);
                }
                // GC spliterator, left and right child
                super.onCompletion(caller);
            }
    

    根据Stream流水线原理中可知S代表终止操作的Sink:

        public static TerminalOp<Integer, OptionalInt>
        makeInt(IntBinaryOperator operator) {
            Objects.requireNonNull(operator);
            class ReducingSink
                    implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
                private boolean empty;
                private int state;
    
                public void begin(long size) {
                    empty = true;
                    state = 0;
                }
    
                @Override
                public void accept(int t) {
                    if (empty) {
                        empty = false;
                        state = t;
                    }
                    else {
                        state = operator.applyAsInt(state, t);
                    }
                }
    
                @Override
                public OptionalInt get() {
                    return empty ? OptionalInt.empty() : OptionalInt.of(state);
                }
    
                @Override
                public void combine(ReducingSink other) {
                    if (!other.empty)
                        accept(other.state);
                }
            }
            return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
                @Override
                public ReducingSink makeSink() {
                    return new ReducingSink();
                }
            };
        }
    

    3.使用陷阱

    public static String query(String question) {
        List<String> engines = new ArrayList<String>() {{
            add("http://www.google.com/?q=");
            add("http://duckduckgo.com/?q=");
            add("http://www.bing.com/search?q=");
        }};
        // get element as soon as it is available
        Optional<String> result = engines.stream().parallel().map((base) -> {
        String url = base + question;
        // open connection and fetch the result
        return WS.url(url).get();
        }).findAny();
        return result.get();
    }
    

    查询搜索引擎是一个阻塞操作。所以在某时刻所有线程都会调用get()方法并且在那里等待结果返回。某个时间所有ForkJoinPool.common()的线程都会被用光。也就是说,下一次你调用这个查询方法,就可能会在一个时间与其他的parallel stream同时运行,而导致第二个任务的性能大大受损。

    这意味着任何依赖parallel streams的程序在什么别的东西占用着common ForkJoinPool时将会变得不可预知并且暗藏危机。

    参考

    相关文章

      网友评论

        本文标题:Stream自动并行原理

        本文链接:https://www.haomeiwen.com/subject/trxalctx.html