美文网首页
高并发(7)- 线程并发工具类-Fork/Join

高并发(7)- 线程并发工具类-Fork/Join

作者: 残冬十九 | 来源:发表于2020-03-31 22:21 被阅读0次

    @[TOC](高并发(7)- 线程并发工具类-Fork/Join)

    前言

        上篇文章讲解了线程之间的协作。本篇就进入线程的并发工具类讲解。
    

    Fork/Join

    一、什么是Fork/Join
       Fork/Join是Jdk1.7之后提供的多线程并发处理框架,Fork/Join的核心思想就是分而治之。
    二、什么是分而治之
      分而治之就是将一个较为复杂的计算,按照我们设定好的阈值分解成多个计算,就是将一个大技术拆分成多个子任务(拆分到不可再拆分)处理,然后将一个小小的任务运算结果进行join汇总。  

       Fork/Join流程图
    三、实现
      forkJoin可以实现两个类
      RecursiveTask代表有返回值的任务,RecursiveAction代表没有返回值的任务。我们可以根据我们的需求使用具体的方法。
    四、代码实现

      我们定义一个场景,现有一个N长度的数组,我们需要会中数组中的值,通用方法就是循环数组来计算总数,使用 ForkJoin怎么实现呢,我们来看看下代码


    forkJon具体流程
    /**
     * @version 1.0
     * @Description forkJoinDemo
     * @Author wb.yang
     */
    public class ForkJoinDemo {
    
        /**
         * 数组长度
         */
        public static final int ARRAY_LENGTH = 100000000;
    
        public static int[] makeArray() {
    
            //new一个随机数发生器
            Random r = new Random();
            int[] result = new int[ARRAY_LENGTH];
            for (int i = 0; i < ARRAY_LENGTH; i++) {
                //用随机数填充数组
                result[i] = r.nextInt(ARRAY_LENGTH * 3);
            }
            return result;
    
        }
    
    
        private static class SumTask extends RecursiveTask<Integer> {
    
            private final static int THRESHOLD = ARRAY_LENGTH / 10;
            private int[] src; //表示我们要实际统计的数组
            private int fromIndex;//开始统计的下标
            private int toIndex;//统计到哪里结束的下标
    
            public SumTask(int[] src, int fromIndex, int toIndex) {
                this.src = src;
                this.fromIndex = fromIndex;
                this.toIndex = toIndex;
            }
    
            @Override
            protected Integer compute() {
                // 判断数组长度是否到我们的阈值
                if (toIndex - fromIndex < THRESHOLD) {
                    int count = 0;
                    //循环数组相加数据
                    for (int i = fromIndex; i <= toIndex; i++) {
                        count = count + src[i];
                    }
                    return count;
                } else {
                    //没有达到阈值,继续拆分任务
                    int mid = (fromIndex + toIndex) / 2;
                    SumTask left = new SumTask(src, fromIndex, mid);
                    SumTask right = new SumTask(src, mid + 1, toIndex);
                    invokeAll(left, right);
                    //组合结果
                    return left.join() + right.join();
                }
            }
        }
    
    
        public static void main(String[] args) {
            // 创建一个forkJin
            ForkJoinPool pool = new ForkJoinPool();
            //创建数组
            int[] src = makeArray();
            //创建我们自己的任务
            SumTask innerFind = new SumTask(src, 0, src.length - 1);
    
            long start = System.currentTimeMillis();
            // 同步调用
            pool.invoke(innerFind);
            System.out.println("Task is Running.....");
    
            System.out.println("The count is " + innerFind.join()
                    + " spend time:" + (System.currentTimeMillis() - start) + "ms");
    
        }
    }
    

    从上面代码可以看出,我们先创建了一个随机数生成函数,生成了指定长度的数组,我们的需求就是汇总数组里面值的总数,通用方法就是循环数组来计算总数,我们使用了forkJoin就是建立一个我们自己任务的类。因为我们需要汇总,需要返回值,所以需要使用RecursiveTask方法,然后SumTask继承了RecursiveTask类,重写了compute方法。
      在compute方法中,我们可以看到进入之后就进行阈值判断(这个阈值就是我们拆分之后的数组长度),判断数组长度是否达到了我们的阈值,如果没有达到阈值的,就需要继续拆分这个任务,在拆分成小任务,如果达到了我们的阈值之后,就可以将我们的数组里面的数据进行汇总,然后返回,最后将我们的结果进行join汇总返回。

       ForkJoin执行结果
      从结果中正是打印了执行结果,这个就是使用forkJoin的过程。

    相关文章

      网友评论

          本文标题:高并发(7)- 线程并发工具类-Fork/Join

          本文链接:https://www.haomeiwen.com/subject/alhnuhtx.html