Java Stream的并行实现

作者: 一字马胡 | 来源:发表于2017-09-29 18:10 被阅读373次

    作者: 一字马胡
    转载标志 【2017-11-03】

    更新日志

    日期 更新内容 备注
    2017-11-03 添加转载标志 持续更新

    并行与并发

    关于并发与并行,需要弄清楚的是,并行关注于多个任务同时进行,而并发则通过调度来不停的切换多个任务执行,而实质上多个任务不是同时执的。并发,英文单词为:Concurrent。并行的英文单词为:parallel。如果想对并发和并行有一个比较直观的认识,可以参考下面这张图片:

    并行与并发

    Fork/Join 框架与 Java Stream API

    Fork/Join框架属于并行框架,关于Fork/Join框架的一些内容,可以参考这篇文章:Java Fork/Join并行框架。简单来说,Fork/Join框架可以将大的任务切分为足够小的任务,然后将小任务分配给不同的线程来执行,而线程之间通过工作窃取算法来协调资源,提前昨晚任务的线程可以去“窃取”其他还没有做完任务的线程的任务,而每一个线程都会持有一个双端队列,里面存储着分配给自己的任务,Fork/Join框架在实现上,为了防止线程之间的竞争,线程在消费分配给自己的任务时,是从队列头取任务的,而“窃取”线程则从队列尾部取任务。
    Fork/Join框架通过fork方法来分割大任务,通过使用join来获取小任务的结果,然后组合成大任务的结果。关于Fork/Join任务模型,可以参考下面的图片:

    Fork/Join的任务模型

    关于Java Stream API的相关内容,可以参考该文章:Java Streams API

    Stream在实现上使用了Fork/Join框架来实现并发,所以使用Stream我们可以在不知不觉间就使得我们的程序跑得飞快,究其原因就是Stream使用了Fork/Join并发框架来处理任务,当然,你需要显示的指定Stream为parallel,否则Stream默认都是串行流。比如对于Collection,你可以使用parallelStream来转换为一个并发流,或者使用stream方法转换为串行流,然后使用parallel操作使得串行流变为并发流。本文的重点是剖析Stream是如何使用Fork/Join来做并发的。

    Stream的并发实现细节

    在了解了Fork/Join并发框架和Java Stream之后,首要的问题就是:Stream是如何使用Fork/Join框架来做到并发的?其实对于使用者来说,了解Stream就是通过Fork/Join框架来做的就好了,但是如果想要深入了解一下Fork/Join框架的实践,以及Java Stream的设计方法,那么去读一下实现的源码还是很有必要的,下文中的分析仅代表个人观点!

    需要注意的一点是,Java Stream的操作分为两类,也可以分为三类,具体的细节可以参考该文章:Java Streams API。一个简单的判断一个操作是否是Terminal操作还是Intermediate操作的方法是,如果操作返回的是一个新的Stream,那么就是一个Intermediate操作,否则就是一个Terminal操作。

    • Intermediate:一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据操作,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。

    • Terminal:一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。

    • 还有一种操作被称为 short-circuiting。用以指:

      • 对于一个 intermediate 操作,如果它接受的是一个无限大(infinite/unbounded)的 Stream,但返回一个 有限的新 Stream。
      • 对于一个 terminal 操作,如果它接受的是一个无限大的 Stream,但能在有限的时间计算出结果。

    Java Stream对四种类型的Terminal操作使用了Fork/Join实现了并发操作,下面的图片展示了这四种操作类型:

    支持并行的四种Stream操作

    我们首先来走一遍Stream操作的执行路径,下面的代码是我们想要做的操作流,下文会根据该代码示例来跟踪Stream的执行路径:

            Stream.of(1,2,3,4)
                    .parallel()
                    .map(n -> n*2)
                    .collect(Collectors.toCollection(ArrayList::new));
    
    

    解释一下,上面的代码想要实现的功能是将(1,2,3,4)这四个数字每一个都变为其自身的两倍,然后收集这些元素到一个ArrayList中返回。这是一个非常简单的功能,下面是上面的操作流的执行路径:

    
        step 1:
        
        public static<T> Stream<T> of(T... values) {
            return Arrays.stream(values);
        }
        
        step 2:
        
            public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
            Objects.requireNonNull(mapper);
            return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
                @Override
                Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                    return new Sink.ChainedReference<P_OUT, R>(sink) {
                        @Override
                        public void accept(P_OUT u) {
                            downstream.accept(mapper.apply(u));
                        }
                    };
                }
            };
        }
        
        step 3:
        
            public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
                ...
                container = evaluate(ReduceOps.makeRef(collector));
                ...
        }
        
        step 4:
        
            final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
            assert getOutputShape() == terminalOp.inputShape();
            if (linkedOrConsumed)
                throw new IllegalStateException(MSG_STREAM_LINKED);
            linkedOrConsumed = true;
    
            return isParallel()
                   ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
                   : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
        }
        
        step 5:
        
        使用Fork/Join框架执行操作。
        
    
    

    上面的五个步骤是经过一些省略的,需要注意的一点是,intermediate类型的操作仅仅将操作加到一个upstream里面,具体的原文描述如下:

    
    Construct a new Stream by appending a stateless intermediate operation to an existing stream.
    

    比如上面我们的操作中的map操作,实际上只是将操作加到一个intermediate链条上面,不会立刻执行。重点是第五步,Stream是如何使用Fork/Join来实现并发的。evaluate这个方法至关重要,在方法里面会分开处理,对于设置了并发标志的操作流,会使用Fork/Join来并发执行操作任务,而对于没有打开并发标志的操作流,则串行执行操作。

    Fork/Join框架的核心方法是一个叫做compute的方法,下面分析一个forEach操作如何通过Fork/Join框架来实现并发,通过追踪代码,可以发现forEach的并发版本其实是一个交由一个ForEachTask对象来做,而ForEachTask类中实现了compute方法:

    // Similar to AbstractTask but doesn't need to track child tasks
            public void compute() {
                Spliterator<S> rightSplit = spliterator, leftSplit;
                long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
                if ((sizeThreshold = targetSize) == 0L)
                    targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
                boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
                boolean forkRight = false;
                Sink<S> taskSink = sink;
                ForEachTask<S, T> task = this;
                while (!isShortCircuit || !taskSink.cancellationRequested()) {
                    if (sizeEstimate <= sizeThreshold ||
                        (leftSplit = rightSplit.trySplit()) == null) {
                        task.helper.copyInto(taskSink, rightSplit);
                        break;
                    }
                    ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
                    task.addToPendingCount(1);
                    ForEachTask<S, T> taskToFork;
                    if (forkRight) {
                        forkRight = false;
                        rightSplit = leftSplit;
                        taskToFork = task;
                        task = leftTask;
                    }
                    else {
                        forkRight = true;
                        taskToFork = leftTask;
                    }
                    taskToFork.fork();
                    sizeEstimate = rightSplit.estimateSize();
                }
                task.spliterator = null;
                task.propagateCompletion();
            }
        }
    
    

    在上面的代码中将大任务拆成成了小任务,那哪里收集了这些小任务呢?看下面的代码:

            @Override
            public <S> Void evaluateParallel(PipelineHelper<T> helper,
                                             Spliterator<S> spliterator) {
                if (ordered)
                    new ForEachOrderedTask<>(helper, spliterator, this).invoke();
                else
                    new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
                return null;
            }
    

    可以看到调用了invoke方法,而对invoke的描述如下:

         * Commences performing this task, awaits its completion if
         * necessary, and returns its result, or throws an (unchecked)
         * {@code RuntimeException} or {@code Error} if the underlying
         * computation did so.
    

    不是说Fork/Join框架嘛?那有了fork为什么没有join而是invoke呢?下面是对join方法的描述:

    
         * Returns the result of the computation when it {@link #isDone is
         * done}.  This method differs from {@link #get()} in that
         * abnormal completion results in {@code RuntimeException} or
         * {@code Error}, not {@code ExecutionException}, and that
         * interrupts of the calling thread do <em>not</em> cause the
         * method to abruptly return by throwing {@code
         * InterruptedException}.
         
    

    根据join的描述,我们知道还可以使用get方法来获取结果,但是get方法会抛出异常而join和invoke方法都不会抛出异常,而是将异常报告给ForkJoinTask,让ForkJoinTask来抛出异常。

    相关文章

      网友评论

        本文标题:Java Stream的并行实现

        本文链接:https://www.haomeiwen.com/subject/urxwextx.html