美文网首页
Java8 Stream 并行流

Java8 Stream 并行流

作者: 垃圾简书_吃枣药丸 | 来源:发表于2020-09-27 16:24 被阅读0次

    并行流就是把一系列数据自动拆分成多个数据块,并使用多个线程来处理这些数据块,这样就可以利用现代CPU多核的优势,把计算任务分配给多个CPU核心,最后再汇总结果。让它们都忙起来~

    # 并行流使用的线程池

    • 先来看看并行流所使用的线程
    public static void main(String[] args) {
        Random random = new Random();
    
        //非并行流
        Stream.generate(() -> random.nextInt(1000))
                .limit(10)
                .forEach(x -> System.out.println(x + ": " + Thread.currentThread().getName()));
    
        //并行流
        Stream.generate(() -> random.nextInt(1000))
                .limit(1000)
                //转换成并行流
                .parallel()
                .forEach(x -> System.out.println(x + ": " + Thread.currentThread().getName()));
    }
    
    • 结果: 可以看到并行流除了使用main线程,还使用了ForkJoinPool线程。

      image.png
    • ForkJoinPool使用演示

    @Getter
    @Setter
    @Slf4j
    public class ForkJoinCalculator extends RecursiveTask<Long> {
    
        /**
         * 获取通用的ForkJoinPool
         */
        private static final ForkJoinPool FORK_JOIN_POOL = ForkJoinPool.commonPool();
        /**
         * 最小批次元素数量
         */
        private static final int MIN_BATCH_SIZE = 1000;
    
        private long[] dataArray;
        private int startIndex;
        private int endIndex;
    
        public ForkJoinCalculator(long[] dataArray) {
            this.dataArray = dataArray;
            this.startIndex = 0;
            this.endIndex = dataArray.length;
        }
    
        private ForkJoinCalculator(long[] dataArray, int startIndex, int endIndex) {
            this.dataArray = dataArray;
            this.startIndex = startIndex;
            this.endIndex = endIndex;
        }
    
        @Override
        protected Long compute() {
            ForkJoinCalculator.printWithThread("cur startIndex=%s,endIndex=%s", startIndex, endIndex);
            long curTotal = 0L;
            if (endIndex - startIndex <= MIN_BATCH_SIZE) {
                //如果需要计算的元素个数小于最小阈值则直接计算
                for (int i = startIndex; i < endIndex; i++) {
                    curTotal += dataArray[i];
                }
                ForkJoinCalculator.printWithThread("直接计算curTotal=%s", curTotal);
            } else {
                // 如要要计算的元素个数大于设定的最小阈值,则进行任务拆分
                // 将元素startIndex~endIndex个任务拆分成两份
    
                // 计算中间索引
                int middleIndex = (startIndex + endIndex) / 2;
                ForkJoinCalculator leftForkJoinCalculator = new ForkJoinCalculator(dataArray, startIndex, middleIndex);
                ForkJoinCalculator rightForkJoinCalculator = new ForkJoinCalculator(dataArray, middleIndex, endIndex);
                // fork():将任务push到线程的工作队列
                // join(): 计算结果
                long leftTotal = leftForkJoinCalculator.fork().join();
                // 第二个子任务,有可能继续划分
                long rightTotal = rightForkJoinCalculator.compute();
                ForkJoinCalculator.printWithThread("leftTotal=%s,rightTotal=%s", leftTotal, rightTotal);
                curTotal = leftTotal + rightTotal;
            }
            return curTotal;
        }
    
        public static void printWithThread(String format, Object... args) {
            String formatStr = String.format(format, args);
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + ": " + formatStr);
        }
    }
    
    • 工作窃取: work stealing
      • 为了保证每个线程完成的任务量相对平均,每个线程都会将分配给自己的任务保存在一个双向队列,每执行完一个任务,就会从队列头取出下一个任务执行,如果当前线程比较优秀,早早地完成了自己队列内的所有任务,则会从其他线程的队列的尾巴上"窃取"一个任务来执行,直到所有线程的队列都清空,以保证以最快的速度执行完所有的任务。

    # 性能与注意点

    使用并行流并不保证性能一定比非并行流和for循环好,有时候可能更差,这取决于要处理的数据集的数据结构。并且,在使用并行流之前,必须确保用的对,否则可能出现计算结果错误的严重后果。请记住,并行化并不是没有代价的。

    • 演示一个错误使用并行流的例子
    public static class Add {
        private long total;
    
        public void add(long curVal) {
            total += curVal;
        }
    }
    
    public void streamDemo() {
        Add add = new Add();
        LongStream.rangeClosed(1, 1_000_000)
                .forEach(add::add);
        System.out.println(add.total);
    }
    
    public void parallelStreamDemo() {
        Add add = new Add();
        LongStream.rangeClosed(1, 1_000_000)
                .parallel()
                .forEach(add::add);
        System.out.println(add.total);
    }
    
    • 结果: 使用并行流计算出来的结果与正确结果500000500000出现了非常大的差异,这是因为在使用并行流时,多个线程同时访问total+=curVal;,会出现线程安全问题。当然,你可以将add()设置为synchronized同步方法,但是很显然性能会很差。
    500000500000
    53692171876
    

    # 高效使用并行流的建议

    • 如果不确定使用并行流是否能提高程序执行的效率:请测量、测试。使用并行流的结果并不一定会产生与我们预期相符的结果,最好的方法就是在使用之前做足够的基准测试来检测性能。
    • 注意装箱和拆箱成本,尽量使用原始类型特化流IntStream, LongStreamDoubleStream
    • 对于较少的数据量,使用并行流从来都不是一个好的决定。使用并行流带来的好处还抵不过线程的额外开销。
    • 因为并行流计算之前需要将数据集拆分,所以在使用并行流之前需要考虑数据集是否易于拆分。例如,ArrayList就比LinkedList易于拆分,因为ArrayList不需要遍历整个数据集就可拆分,而后者必须完整遍历。
    • 按照可拆分性:
    数据源 可拆分性
    ArrayList 极佳
    LinkedList
    Stream.range() 极佳
    Stream.iterate()
    HashSet
    TreeSet

    相关文章

      网友评论

          本文标题:Java8 Stream 并行流

          本文链接:https://www.haomeiwen.com/subject/xyukuktx.html