美文网首页
线程的并发工具类 ForkJoin 框架

线程的并发工具类 ForkJoin 框架

作者: wuchao226 | 来源:发表于2021-04-09 17:48 被阅读0次

    ForkJoin 框架介绍

    ForkJoin 框架是一个实现了 ExecutorService 接口的多线程处理器,它专为那些可以通过递归分解成更细小的任务而设计,最大化的利用多核处理器来提高应用程序的性能。

    与其他ExecutorService相关的实现相同的是,ForkJoin 框架会将任务分配给线程池中的线程。而与之不同的是,ForkJoin 框架在执行任务时使用了工作窃取算法

    ForkJoin 原理

    将一个大任务,进行拆分成(fork)若干个小任务(拆到不可在拆时),在将一个个的小任务运算的结果进行 join 汇总。

    工作窃取算法

    工作窃取算法指的是在多线程执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行。

    工作窃取流程如下图所示:

    当一个线程窃取另一个线程的时候,为了减少两个任务线程之间的竞争,我们通常使用双端队列来存储任务。被窃取的任务线程都从双端队列的头部拿任务执行,而窃取其他任务的线程从双端队列的尾部执行任务。

    当一个线程在窃取任务时要是没有其他可用的任务了,这个线程会进入阻塞状态以等待再次“工作”。

    如此一来就能能够减少线程阻塞或是闲置的时间,提高 CPU 利用率

    ForkJoin 实战

    Fork/Join 框架就是对任务的分割与子任务的合并,所以要实现这个框架,先得有任务。在 ForkJoin 框架里提供了抽象类 ForkJoinTask 来实现任务。

    它提供在任务 中执行 fork 和 join 的操作机制,通常我们不直接继承 ForkjoinTask 类,只需要直接继承其子类。

      1. RecursiveAction,用于没有返回结果的任务
      1. RecursiveTask,用于有返回值的任务

    ForkJoinTask

    ForkJoinTask 是一个类似普通线程的实体,但是比普通线程轻量得多。
    fork()方法:使用线程池中的空闲线程异步提交任务

    // ForkJoinTask.java
    public final ForkJoinTask<V> fork() {
        Thread t;
        // ForkJoinWorkerThread 是执行 ForkJoinTask 的专有线程,由ForkJoinPool管理
        // 先判断当前线程是否是ForkJoin专有线程,如果是,则将任务push到当前线程所负责的队列里去
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            // 如果不是则将线程加入队列
            // 没有显式创建ForkJoinPool的时候走这里,提交任务到默认的common线程池中
            ForkJoinPool.common.externalPush(this);
        return this;
    }
    

    fork()主要作用就是把任务推入当前工作线程的工作队列里

    join()方法:等待处理任务的线程处理完毕,获得返回值。

    public final V join() {
        int s;
        // doJoin()方法来获取当前任务的执行状态
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            // 任务异常,抛出异常
            reportException(s);
        // 任务正常完成,获取返回值
        return getRawResult();
    }
    /**
     * doJoin()方法用来返回当前任务的执行状态
     **/
     private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        // 先判断任务是否执行完毕,执行完毕直接返回结果(执行状态)
        return (s = status) < 0 ? s :
            // 如果没有执行完毕,先判断是否是ForkJoinWorkThread线程
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            // 如果是,先判断任务是否处于工作队列顶端(意味着下一个就执行它)
            // tryUnpush()方法判断任务是否处于当前工作队列顶端,是返回true
            // doExec()方法执行任务
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            // 如果是处于顶端并且任务执行完毕,返回结果
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            // 如果不在顶端或者在顶端却没未执行完毕,那就调用awitJoin()执行任务
            // awaitJoin():使用自旋使任务执行完成,返回结果
            wt.pool.awaitJoin(w, this, 0L) :
            // 如果不是ForkJoinWorkThread线程,执行externalAwaitDone()返回任务结果
            externalAwaitDone();
    }
    

    Thread.join()会使线程阻塞,而ForkJoinPool.join()会使线程免于阻塞,下面是ForkJoinPool.join()的流程图:

    join流程图.png

    RecursiveAction 和 RecursiveTask

    通常情况下,在创建任务的时候我们一般不直接继承 ForkJoinTask,而是继承它的子类RecursiveAction和RecursiveTask。

    两个都是ForkJoinTask的子类,RecursiveAction可以看做是无返回值的ForkJoinTask,RecursiveTask是有返回值的ForkJoinTask

    此外,两个子类都有执行主要计算的方法compute(),RecursiveAction 的compute() 返回 void,RecursiveTask 的 compute() 有具体的返回值。

    task 要通过 ForkJoinPool 来执行,使用 submit 或 invoke 提交,两者的区 别是:invoke 是同步执行,调用之后需要等待任务完成,才能执行后面的代码; submit 是异步执行。 join()和 get 方法当任务完成的时候返回计算结果。

    在我们自己实现的 compute 方法里,首先需要判断任务是否足够小,如果 足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用 invokeAll 方法时,又会进入 compute 方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用 join 方法会等待子任务执行完并得到其结果。

    ForkJoinPool

    ForkJoinPool 是用于执行ForkJoinTask任务的执行(线程)池。
    ForkJoinPool 管理着执行池中的线程和任务队列,此外,执行池是否还接受任务,显示线程的运行状态也是在这里处理。

    线程池最大的特点就是分叉(fork)合并(join)模式,将一个大任务拆分成多个小任务,并行执行,再结合工作窃取算法提高整体的执行效率,充分利用CPU资源。

    @sun.misc.Contended
    public class ForkJoinPool extends AbstractExecutorService {
        // 任务队列
        volatile WorkQueue[] workQueues;  
        // 线程的运行状态
        volatile int runState;
        // 创建ForkJoinWorkerThread的默认工厂,可以通过构造函数重写
        public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
        // 公用的线程池,其运行状态不受shutdown()和shutdownNow()的影响
        static final ForkJoinPool common;
    
    
        // 私有构造方法,没有任何安全检查和参数校验,由makeCommonPool直接调用
        // 其他构造方法都是源自于此方法
        // parallelism: 并行度,
        // 默认调用java.lang.Runtime.availableProcessors() 方法返回可用处理器的数量
        private ForkJoinPool(int parallelism,
                             ForkJoinWorkerThreadFactory factory,// 工作线程工厂
                             UncaughtExceptionHandler handler,// 拒绝任务的handler
                             int mode,// 同步模式
                             String workerNamePrefix) {// 线程名prefix
            this.workerNamePrefix = workerNamePrefix;
            this.factory = factory;
            this.ueh = handler;
            this.config = (parallelism & SMASK) | mode;
            long np = (long)(-parallelism); // offset ctl counts
            this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
        }
    }
    

    WorkQueue
    双端队列,ForkJoinTask 存放在这里。

    当工作线程在处理自己的工作队列时,会从队列首取任务来执行(FIFO);如果是窃取其他队列的任务时,窃取的任务位于所属任务队列的队尾(LIFO)。

    ForkJoinPool 与传统线程池最显著的区别就是它维护了一个工作队列数组(volatile WorkQueue[] workQueues,ForkJoinPool中的每个工作线程都维护着一个工作队列)。

    runState
    ForkJoinPool 的运行状态。SHUTDOWN 状态用负数表示,其他用2的幂次表示。

    ForkJoin 的使用

    ForkJoinPool 负责管理线程和任务,ForkJoinTask 实现 fork 和 join 操作,所以要使用 ForkJoin 框架就离不开这两个类了,只是在实际开发中我们常用 ForkJoinTask 的子类 RecursiveTask 和 RecursiveAction 来替代 ForkJoinTask。

    // 创建一个随机数组
    public class MakeArray {
        //数组长度
        public static final int ARRAY_LENGTH = 400;
        public final static int THRESHOLD = 47;
    
        public static int[] makeArray() {
            //new一个随机数发生器
            Random r = new Random();
            int[] result = new int[ARRAY_LENGTH];
            for (int i = 0; i < ARRAY_LENGTH; i++) {
                //用随机数填充数组
                result[i] = r.nextInt(ARRAY_LENGTH * 3);
            }
            return result;
        }
    }
    
    /**
     * 单线程执行累加
     */
    public class SumNormal {
    
        public static void main(String[] args) {
            int count = 0;
            int[] src = MakeArray.makeArray();
    
            long start = System.currentTimeMillis();
            for (int i = 0; i < src.length; i++) {
                SleepTools.ms(1);
                count = count + src[i];
            }
            System.out.println("The count is " + count
                    + " spend time:" + (System.currentTimeMillis() - start) + "ms");
        }
    }
    

    单线程执行累加的例子输出:


    /**
     * ForkJoin执行累加
     */
    public class SumArray {
        private static class SumTask extends RecursiveTask<Integer> {
    
            /*阈值*/
            private final static int THRESHOLD = MakeArray.ARRAY_LENGTH / 10;
            private int[] src;
            private int fromIndex;
            private int toIndex;
    
            public SumTask(int[] src, int fromIndex, int toIndex) {
                this.src = src;
                this.fromIndex = fromIndex;
                this.toIndex = toIndex;
            }
    
            @Override
            protected Integer compute() {
                /*任务的大小是否合适*/
                if (toIndex - fromIndex < THRESHOLD) {
    //                System.out.println(" from index = " + fromIndex
    //                        + " toIndex=" + toIndex);
                    int count = 0;
                    for (int i = fromIndex; i <= toIndex; i++) {
                        SleepTools.ms(1);
                        count = count + src[i];
                    }
                    return count;
                } else {
                    //fromIndex....mid.....toIndex
                    int mid = (fromIndex + toIndex) / 2;
                    SumTask left = new SumTask(src, fromIndex, mid);
                    SumTask right = new SumTask(src, mid + 1, toIndex);
                    invokeAll(left, right);
                    return left.join() + right.join();
                }
            }
        }
    
    
        public static void main(String[] args) {
    
            int[] src = MakeArray.makeArray();
            /*new出池的实例*/
            ForkJoinPool pool = new ForkJoinPool();
            /*new出Task的实例*/
            SumTask innerFind = new SumTask(src, 0, src.length - 1);
    
            long start = System.currentTimeMillis();
    
            pool.invoke(innerFind);
    //        System.out.println("Task is Running.....");
    
            System.out.println("The count is " + innerFind.join()
                    + " spend time:" + (System.currentTimeMillis() - start) + "ms");
    
        }
    }
    

    ForkJoin 执行累加例子输出:

    通过输出可以很明显的看出来,使用 ForkJoin 框架的效率要比单线程执行累加要高很多。

    ForkJoin 是使用多个线程协作来计算的,所以会有线程通信和线程切换的开销。如果要计算的任务比较简单,直接使用单线程会更快一些。但如果要计算的东西比较复杂,计算机又是多核的情况下,就可以充分利用多核CPU来提高计算速度。

    参考资料

    参考 https://www.bookstack.cn/read/RedSpider1-concurrent/article-03-18.md

    相关文章

      网友评论

          本文标题:线程的并发工具类 ForkJoin 框架

          本文链接:https://www.haomeiwen.com/subject/lehukltx.html