美文网首页一些收藏Java 核心技术Java
java8(四)并行流和性能

java8(四)并行流和性能

作者: 我犟不过你 | 来源:发表于2021-10-25 11:25 被阅读0次

    在前面的章节中,我们简要地提到了 Stream 接口可以让你非常方便地处理它的元素:可以通过对收集源调用 parallelStream 方法来把集合转换为并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样一来,你就可以自动把给定操作的工作负荷分配给多核处理器的所有内核。

    在现实中,对顺序流调用 parallel 方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个 boolean 标志,表示你想让调用 parallel 之后进行的所有操作都并行执行。类似地,你只需要对并行流调用 sequential 方法就可以把它变成顺序流

    一、性能测试

    在了解其具体内容之前,我们先来做一个测试。我们写一个对指定数值的自然数流进行求和操作,针对一个求和操作,执行10次,取出使用时间最短的操作。分别做传统for循环方式,使用Stream方式,以及并行Stream方式,代码如下所示:

    import java.util.function.Function;
    import java.util.stream.Stream;
    
    /**
     * @description: 测试并行的性能
     * @author:weirx
     * @date:2021/10/22 15:20
     * @version:3.0
     */
    public class TestStreamParallel {
    
        /**
         * description: 传入一个函数和一个数值,此方法会对传入的方法执行10次,取出最短执行时间
         *
         * @param adder
         * @param n
         * @return: long
         * @author: weirx
         * @time: 2021/10/22 15:29
         */
        public static long measureSumPerf(Function<Long, Long> adder, long n) {
            long fastest = Long.MAX_VALUE;
            for (int i = 0; i < 10; i++) {
                long start = System.nanoTime();
                adder.apply(n);
                long duration = (System.nanoTime() - start) / 1_000_000;
                if (duration < fastest) {
                    fastest = duration;
                }
            }
            return fastest;
        }
    
        /**
         * description: 对输入数值求和
         *
         * @param aLong
         * @return: java.lang.Long
         * @author: weirx
         * @time: 2021/10/25 10:02
         */
        private static Long testFor(Long aLong) {
    //        jdk1.7求和
            long result = 0;
            for (long i = 1L; i <= aLong; i++) {
                result += i;
            }
            return result;
        }
    
        /**
         * description: 对输入数值求和
         *
         * @param aLong
         * @return: java.lang.Long
         * @author: weirx
         * @time: 2021/10/25 10:02
         */
        private static Long testStreamParallel(Long aLong) {
    
    //        jdk1.8求和 - 并行
            return Stream.iterate(0L, i -> i + 1).limit(aLong).parallel().reduce(0L, Long::sum);
        }
    
        /**
         * description: 对输入数值求和
         *
         * @param aLong
         * @return: java.lang.Long
         * @author: weirx
         * @time: 2021/10/25 10:02
         */
        private static Long testStream(Long aLong) {
    
    //        jdk1.8求和 - 非并行
            return Stream.iterate(0L, i -> i + 1).limit(aLong).reduce(0L, Long::sum);
        }
    }
    

    传统for循环结果如下:

    
        public static void main(String[] args) {
            System.out.println("最短耗时:" + measureSumPerf(TestStreamParallel::testFor, 10000000) + "ms");
        }
    ------------------------------------------
    最短耗时:3ms
    

    Stream结果如下:

    
        public static void main(String[] args) {
            System.out.println("最短耗时:" + measureSumPerf(TestStreamParallel::testStream, 10000000) + "ms");
        }
    ------------------------------------------
    最短耗时:106ms
    

    StreamParallel结果如下:

    
        public static void main(String[] args) {
            System.out.println("最短耗时:" + measureSumPerf(TestStreamParallel::testStreamParallel, 10000000) + "ms");
        }
    ------------------------------------------
    最短耗时:131ms
    

    结果分析:
    1)用传统 for 循环的迭代版本执行起来应该会快很多,因为它更为底层,更重要的是不需要对原始类型做任何装箱或拆箱操作。
    2)使用Stream的方式要比传统for慢不少。
    3)使用并行Stream的方式,反而效率是最低的。

    那么产生上述问题的原因是什么?
    1) iterate 生成的是装箱的对象,必须拆箱成数字才能求和;
    2)很难把 iterate 分成多个独立块来并行执行,因为每次应用这个函数都要依赖前一次应用的结果,如下图所示:

    Stream-iterator.png

    整张数字列表在归纳的过程中并没有准备好,因而无法有效的把流划分为小块进行并行处理。当把流标记位并行的时候,其实是增加了开销,要把每次的求和操作分配到不同的线程上处理。

    综上所述:iterate是一个不易并行化的操作。甚至会使整个流操作的效率下降。

    如何合理的高效的解决上述问题呢?
    我们可以使用LongStream.rangeClosed这个操作,相比于iterate有两点优点:
    1)直接生产原始数据类型,没有拆箱和装箱的开销。
    2)生成数据范围,容易拆分成小块,便于并行。

    下面直接看结果:

        /**
         * description: 使用LongStream.rangeClosed求和
         * @param aLong
         * @return: java.lang.Long
         * @author: weirx
         * @time: 2021/10/25 10:53
         */
        private static Long testRangeClosed(Long aLong) {
            return LongStream.rangeClosed(0, aLong).reduce(0L, Long::sum);
        }
    
        public static void main(String[] args) {
            System.out.println("最短耗时:" + measureSumPerf(TestStreamParallel::testRangeClosed, 10000000) + "ms");
        }
    ---------------------------
    最短耗时:4ms
    

    如果对其使用并行方式呢?结果如下:

        /**
         * description: 使用LongStream.rangeClosed求和
         * @param aLong
         * @return: java.lang.Long
         * @author: weirx
         * @time: 2021/10/25 10:53
         */
        private static Long testRangeClosedParallel(Long aLong) {
            return LongStream.rangeClosed(0, aLong).parallel().reduce(0L, Long::sum);
        }
    
        public static void main(String[] args) {
            System.out.println("最短耗时:" + measureSumPerf(TestStreamParallel::testRangeClosedParallel, 10000000) + "ms");
        }
    --------------------------------------
    最短耗时:1ms
    

    二、正确并高效的使用并行流

    首先看下下面的一个错误用法,有如下的求和代码:

    import java.util.stream.LongStream;
    
    /**
     * @description: 并发情况下的流
     * @author:weirx
     * @date:2021/10/25 11:02
     * @version:3.0
     */
    public class ConcurrentForStreamParallel {
    
        /**
         * description: 调用Accumulator的add方法求和
         * @param n
         * @return: long
         * @author: weirx
         * @time: 2021/10/25 11:03
         */
        public static long sideEffectSum(long n) {
            Accumulator accumulator = new Accumulator();
            LongStream.rangeClosed(1, n).forEach(accumulator::add);
            return accumulator.total;
        }
    
        public static class Accumulator {
            public long total = 0;
    
            public void add(long value) {
                total += value;
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i< 10 ;i++){
                System.out.println(sideEffectSum(10000000));
            }
        }
    
    -----------------------------------
    50000005000000
    50000005000000
    50000005000000
    50000005000000
    50000005000000
    50000005000000
    50000005000000
    50000005000000
    50000005000000
    50000005000000
    

    如果对这个代码使用并行操作,会有如下结果:

        public static long sideEffectSum(long n) {
            Accumulator accumulator = new Accumulator();
            LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
            return accumulator.total;
        }
    
    ------------------------------------
    11448626323551
    10712400489958
    4825469864081
    5760309604570
    8135917300720
    13477296726050
    10068084182814
    7911075623394
    11626955826086
    8579850729746
    

    如上所示,直接出现了线程间的资源竞争问题,暂不说效率问题,连数据的正确性都无法保证了。

    下面总结一些关于使用并行流的一些建议:

    1)测试,最直观的方式,如果无法确定使用并行流能带来性能的提升,那么就如本文一样,找到最合适的方式。

    2)留意装箱。自动装箱和自动拆箱会使得性能大大的降低。应当使用 IntStream 、LongStream 、 DoubleStream 等原始流来避免这些操作。

    3) 有些操作本身在并行流上的性能就比顺序流差。特别是 limit 和 findFirst 等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如, findAny 会比 findFirst 性能好,因为它不一定要按顺序来执行。如果你需要流中的n个元素而不是专门要前n个的话,对无序并行流调用limit 可能会比单个有序流更高效。

    4)考虑流水线的计算成本。假设处理元素个数N,Q是一个元素通过流水线的成本,则N*Q就是整个流水线的计算成本。Q的值越高,则使用并行流的效率可能越高。

    5)较小数据量使用并行流并不是一个好的选择。并行化本身也是有一定的开销的。

    6)考虑流背后的数据结构是否易于拆分。ArrayList比LinkedList的拆分效率高的多。前者不需要遍历,后者需要遍历。

    7)流本省的操作导致最终整个流水线的不确定性。比如filter操作过滤大量的元素,导致流的大小未知。

    8) 还要考虑终端操作中合并步骤的代价是大是小。(例如 Collector 中的 combiner 方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。

    下面给出几种数据源的并行效率:

    可分解性
    ArrayList 极佳
    LinkedList
    IntStream.range 极佳
    Stream.iterate
    HashSet
    TreeSet

    相关文章

      网友评论

        本文标题:java8(四)并行流和性能

        本文链接:https://www.haomeiwen.com/subject/wapcaltx.html