美文网首页
Bug 篇:Java 8 Parallel Stream 陷阱

Bug 篇:Java 8 Parallel Stream 陷阱

作者: 达微 | 来源:发表于2019-12-10 21:23 被阅读0次

    误区一:Java 8 中的 Stream 只要使用 parallel 就可以并行处理,只要使用 sequential 就可以单线程处理
    parallel 方法和 sequential 方法不会对流产生任何影响,只是改动了状态位而已
    Stream 是否并行取决于最后一次改变状态位的方法是 parallel 还是 sequential,下面两种表达式等价:

    stream.parallel().filter(null).sequential().map(null).parallel().collect(null);
    stream.parallel().filter(null).map(null).collect(null);
    

    误区二:只要简单地加上 parallel 方法,就能使 Stream 高效并行
    并行要求计算的规模要够大,确保在线程中运算的时间多过分配线程的开销
    并行的数据必须是无序并且无状态的,前者会影响速度,后者会影响结果正确性(状态可以理解为 Java 中共享变量)

    /* 从一加到一千万 */
    public class ParallelStreamToGetSumTest {
        
        private long num = 10_000_000;
    
        /****************************** sequectial *******************************/
    
        @Test // 正确的顺序计算
        public void sequential() {
            PerformanceUtils.test(() ->
                    LongStream.rangeClosed(1, num).sum());  // 运行时间 19 milliseconds
        }
    
        @Test // 错误的顺序计算,没有考虑拆箱的开销
        public void errorSequential() {
            PerformanceUtils.test(() ->
                    Stream.iterate(1L, i -> i + 1L).limit(num)
                            .reduce(0L, Long::sum)); // 运行时间 256 milliseconds
        }
    
        /****************************** parallel *******************************/
    
        @Test // 正确的并行计算
        public void parallel() {
            PerformanceUtils.test(() ->
                    LongStream.rangeClosed(1, num).parallel().sum()); // 运行时间 4 milliseconds
        }
    
        @Test // 错误的并行计算,没有考虑开箱的开销与违反了无序性原则(iterate 没有范围,是顺序产生的)
        public void errorParallel() {
            PerformanceUtils.test(() ->
                    Stream.iterate(1L, i -> i + 1L).limit(num)
                            .parallel().reduce(0L, Long::sum)); // 运行时间 1680 milliseconds
        }
    
        @Test // 错误的并行计算,使用了共享变量,导致了线程竞争,并且共享变量的累加不是原子性的,所以结果也是错误的
        public void errorParallel2() {
            PerformanceUtils.test(() -> {
                Accumulator accumulator = new Accumulator();
                Stream.iterate(1L, i -> i + 1L).limit(num).parallel().forEach(accumulator::add);
                return accumulator.total;
            }); // 运行时间 1125 milliseconds,但结果错误
        }
    
        /****************************** inner class *******************************/
    
        private static class Accumulator {
        
            private long total = 0;
    
            private void add(long value) {
                total += value;
            }
        }
    }
    

    P.S. PerformanceUnitls 是笔者写的性能测试工具,源码位于 https://github.com/free-myself/commons/blob/master/src/main/java/org/tree/commons/utils/PerformanceUtils.java

    相关文章

      网友评论

          本文标题:Bug 篇:Java 8 Parallel Stream 陷阱

          本文链接:https://www.haomeiwen.com/subject/qxkngctx.html