JUC并行计算框架 Fork/Join 原理图文详解&代码示例

作者: 光剑书架上的书 | 来源:发表于2020-05-03 16:57 被阅读0次

    关键词:divide and conquer algorithm,work-stealing,WorkQueue

    ForkJoinPool 是什么?

    ForkJoinPool 是 JDK 7 中,@author Doug Lea 加入的一个线程池类。Fork/Join 框架的核心原理就是分治算法(Divide-and-Conquer)和工作窃取算法(work-stealing algorithm)。

    Fork分解任务成独立的子任务,用多线程去执行这些子任务,Join合并子任务的结果。这样就能使用多线程的方式来执行一个任务。

    JDK7引入的Fork/Join有三个核心类:

    • ForkJoinPool,执行任务的线程池
    • ForkJoinWorkerThread,执行任务的工作线程
    • ForkJoinTask,一个用于ForkJoinPool的任务抽象类。

    ForkJoinPool是框架的核心,不同于其他线程池,它的构建不需要提供核心线程数,最大线程数,阻塞队列等,还增加了未捕获异常处理器,而该处理器会交给工作线程,由该线程处理,这样的好处在于当一个线程的工作队列上的某个任务出现异常时,不至于结束掉线程,而是让它继续运行队列上的其他任务。它会依托于并行度(或默认根据核数计算)来决定最大线程数,它内部维护了WorkQueue数组ws取代了阻塞队列,ws中下标为奇数的为工作线程的所属队列,偶数的为共享队列,虽然名称有所区分,但重要的区别只有一点:共享队列不存在工作线程。

    ForkJoinPool 的状态控制变量有:

        // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
        private static final int  RSLOCK     = 1;
        private static final int  RSIGNAL    = 1 << 1;
        private static final int  STARTED    = 1 << 2;
        private static final int  STOP       = 1 << 29;
        private static final int  TERMINATED = 1 << 30;
        private static final int  SHUTDOWN   = 1 << 31;
    
        // Instance fields
        volatile long ctl;                   // main pool control
        volatile int runState;               // lockable status
        final int config;                    // parallelism, mode
        int indexSeed;                       // to generate worker index
        volatile WorkQueue[] workQueues;     // main registry
        final ForkJoinWorkerThreadFactory factory;
        final UncaughtExceptionHandler ueh;  // per-worker UEH
        final String workerNamePrefix;       // to create worker name string
        volatile AtomicLong stealCounter;    // also used as sync monitor
    

    ForkJoinPool维护了一个ctl控制信号,前16位表示活跃worker数,33至48位表示worker总数,后32位可以粗略理解用于表示worker等待队列的栈顶。ForkJoinPool利用这个ctl,WorkQueue的scanState和stackPred以及ws的索引算法维护了一个类似队列(或者叫栈更贴切一些)的数据结构。每当有一个线程偷不到任务,就会存放此前的ctl后置标记位到pred,并将自己的索引交给ctl作为栈顶。相应的唤醒操作则由栈顶起。相应的方法在进行尝试添加worker时,会综合当前是否有阻塞等待任务的线程。

    当所有线程都不能窃取到新的任务,进入等待队列时,称之为“静寂态”。

    ForkJoinPool对全局全状的修改需要加锁进行,这些操作如修改ctl(改变栈顶,增删活跃数或总数等),处理ws中的元素,扩容ws,关闭线程池,初始化(包含ws的初始化),注册线程入池等。而这个锁就是runState,它除了当锁,也间接表示了运行状态,相应的线程池的SHUTDOWN,STOP,TERMINATED等状态均与其相应的位有关。

    线程池的并行度保存在config字段的后16位,config的第17位决定了是FIFO还是LIFO。而这个并行度也通过间接地取反并计入到ctl的前32位,线程池中判断是否当前有活跃的线程,或者是否已进入寂静态,都是用保存在config的并行度和保存在ctl前32位的活跃数与并行度的运算结果进行相加,判断是否会溢出(正数)来决定的。

    ForkJoinPool还提供了补偿机制,用于在线程将要阻塞在执行过程中前释放掉一个正在空闲的工作线程或创建一个新的工作线程,从而保证了并行度。

    因为ForkJoinTask比较复杂,抽象方法比较多,日常使用时一般不会继承ForkJoinTask来实现自定义的任务,而是继承ForkJoinTask的两个子类:

    • RecursiveTask:子任务带返回结果时使用
    • RecursiveAction:子任务不带返回结果时使用

    关于Fork/Join框架的原理,可参考:Doug Lea的文章:A Java Fork/Join Framework http://gee.cs.oswego.edu/dl/papers/fj.pdf

    框架模型

    ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。

    ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。

    ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。

    ForkJoinPool 分治算法思想

    分治(divide and conquer),也就是把一个复杂的问题分解成相似的子问题,然后子问题再分子问题,直到问题分的很简单不必再划分了。然后层层返回子问题的结果,最终合并返回问题结果。

    分治在算法上有很多应用,类似大数据的MapReduce,归并算法、快速排序算法等。JUC中的Fork/Join的并行计算框架类似于单机版的 MapReduce。

    Fork/Join ,从字面上我们就可以理解,分治的过程分为两个阶段,第一个阶段分解任务(fork),把任务分解为一个个小任务直至小任务可以简单的计算返回结果。

    第二阶段合并结果(join),把每个小任务的结果合并返回得到最终结果。而Fork就是分解任务,Join就是合并结果。

    Fork/Join框架主要包含两部分:ForkJoinPool、ForkJoinTask。

    ForkJoinPool就是治理分治任务的线程池。它和在之前的文章提到ThreadPoolExecutor线程池,共同点都是消费者-生产者模式的实现,但是有一些不同。

    ThreadPoolExecutor的线程池是只有一个任务队列的,而ForkJoinPool有多个任务队列。通过ForkJoinPool的invoke或submit或execute提交任务的时候会根据一定规则分配给不同的任务队列,并且任务队列的双端队列。

    ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。 其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。

    ForkJoinPool 与 ExecutorService

    ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。
    使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法来执行指定任务了。

    那么,使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?

    使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。

    这正是工作窃取模式的优点。

    ForkJoinPool is an advanced version of ThreadPoolExecutor with concepts like work-stealing which enable faster and efficient solving of divide and conquer algorithms.

    我们常用的数组工具类 Arrays 在JDK 8之后新增的并行排序方法(parallelSort)就运用了 ForkJoinPool 的特性,还有 ConcurrentHashMap 在JDK 8之后添加的函数式方法(如forEach等)也有运用。在整个JUC框架中,ForkJoinPool 相对其他类会复杂很多。

    创建 ForkJoinPool 对象

    ForkJoinPool作者Doug Lea 在ForkJoinPool主类的注释说明中,有这样一句话:

    A static commonPool() is available and appropriate for most applications. The common pool is used by any ForkJoinTask that is not explicitly submitted to a specified pool.
    Using the common pool normally reduces resource usage (its threads are slowly reclaimed during periods of non-use, and reinstated upon subsequent use).

    以上描述大致的中文解释是:ForkJoinPools类有一个静态方法commonPool(),这个静态方法所获得的ForkJoinPools实例是由整个应用进程共享的,并且它适合绝大多数的应用系统场景。使用commonPool通常可以帮助应用程序中多种需要进行归并计算的任务共享计算资源,从而使后者发挥最大作用(ForkJoinPools中的工作线程在闲置时会被缓慢回收,并在随后需要使用时被恢复),而这种获取ForkJoinPools实例的方式,才是Doug Lea推荐的使用方式。代码如下:

    ForkJoinPool commonPool =  ForkJoinPool.commonPool();
    

    我们可以看到:

        /**
         * Returns the common pool instance. This pool is statically
         * constructed; its run state is unaffected by attempts to {@link
         * #shutdown} or {@link #shutdownNow}. However this pool and any
         * ongoing processing are automatically terminated upon program
         * {@link System#exit}.  Any program that relies on asynchronous
         * task processing to complete before program termination should
         * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
         * before exit.
         *
         * @return the common pool instance
         * @since 1.8
         */
        public static ForkJoinPool commonPool() {
            // assert common != null : "static init error";
            return common;
        }
    

    其中,common 是:

        /**
         * Common (static) pool. Non-null for public use unless a static
         * construction exception, but internal usages null-check on use
         * to paranoically [偏执地] avoid potential initialization circularities
         * as well as to simplify generated code.
         */
        static final ForkJoinPool common;
    

    而这个 common 成员变量是在 ForkJoinPool 源代码, 用一段静态代码初始化的:

    static {
            // initialize field offsets for CAS etc
            try {
                U = sun.misc.Unsafe.getUnsafe();
                Class<?> k = ForkJoinPool.class;
                CTL = U.objectFieldOffset
                    (k.getDeclaredField("ctl"));
                RUNSTATE = U.objectFieldOffset
                    (k.getDeclaredField("runState"));
                STEALCOUNTER = U.objectFieldOffset
                    (k.getDeclaredField("stealCounter"));
                Class<?> tk = Thread.class;
                PARKBLOCKER = U.objectFieldOffset
                    (tk.getDeclaredField("parkBlocker"));
                Class<?> wk = WorkQueue.class;
                QTOP = U.objectFieldOffset
                    (wk.getDeclaredField("top"));
                QLOCK = U.objectFieldOffset
                    (wk.getDeclaredField("qlock"));
                QSCANSTATE = U.objectFieldOffset
                    (wk.getDeclaredField("scanState"));
                QPARKER = U.objectFieldOffset
                    (wk.getDeclaredField("parker"));
                QCURRENTSTEAL = U.objectFieldOffset
                    (wk.getDeclaredField("currentSteal"));
                QCURRENTJOIN = U.objectFieldOffset
                    (wk.getDeclaredField("currentJoin"));
                Class<?> ak = ForkJoinTask[].class;
                ABASE = U.arrayBaseOffset(ak);
                int scale = U.arrayIndexScale(ak);
                if ((scale & (scale - 1)) != 0)
                    throw new Error("data type scale not a power of two");
                ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
            } catch (Exception e) {
                throw new Error(e);
            }
    
            commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
            defaultForkJoinWorkerThreadFactory =
                new DefaultForkJoinWorkerThreadFactory();
            modifyThreadPermission = new RuntimePermission("modifyThread");
    
            common = java.security.AccessController.doPrivileged
                (new java.security.PrivilegedAction<ForkJoinPool>() {
                    public ForkJoinPool run() { return makeCommonPool(); }});
            int par = common.config & SMASK; // report 1 even if threads disabled
            commonParallelism = par > 0 ? par : 1;
        }
    
        /**
         * Creates and returns the common pool, respecting user settings
         * specified via system properties.
         */
        private static ForkJoinPool makeCommonPool() {
            int parallelism = -1;
            ForkJoinWorkerThreadFactory factory = null;
            UncaughtExceptionHandler handler = null;
            try {  // ignore exceptions in accessing/parsing properties
                String pp = System.getProperty
                    ("java.util.concurrent.ForkJoinPool.common.parallelism");
                String fp = System.getProperty
                    ("java.util.concurrent.ForkJoinPool.common.threadFactory");
                String hp = System.getProperty
                    ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
                if (pp != null)
                    parallelism = Integer.parseInt(pp);
                if (fp != null)
                    factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                               getSystemClassLoader().loadClass(fp).newInstance());
                if (hp != null)
                    handler = ((UncaughtExceptionHandler)ClassLoader.
                               getSystemClassLoader().loadClass(hp).newInstance());
            } catch (Exception ignore) {
            }
            if (factory == null) {
                if (System.getSecurityManager() == null)
                    factory = defaultForkJoinWorkerThreadFactory;
                else // use security-managed default
                    factory = new InnocuousForkJoinWorkerThreadFactory();
            }
            if (parallelism < 0 && // default 1 less than #cores 处理器核数
                (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
                parallelism = 1;
            if (parallelism > MAX_CAP)
                parallelism = MAX_CAP;
            return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                    "ForkJoinPool.commonPool-worker-");
        }
    

    工作线程和工作队列

    工作线程

    ForkJoinWorkerThread是运行在ForkJoinPool中的线程,它内部会维护一个存放ForkJoinTask的WorkQueue队列,而WorkQueue是ForkJoinPool的内部类。

    一个WorkQueue工作队列所属的归并计算工作线程ForkJoinWorkerThread,用来执行 ForkJoinTask 。 注意,工作队列也可能不属于任何工作线程。

    /**
     * A thread managed by a {@link ForkJoinPool}, which executes
     * {@link ForkJoinTask}s.
     * This class is subclassable solely for the sake of adding
     * functionality -- there are no overridable methods dealing with
     * scheduling or execution.  However, you can override initialization
     * and termination methods surrounding the main task processing loop.
     * If you do create such a subclass, you will also need to supply a
     * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
     * {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
     *
     * @since 1.7
     * @author Doug Lea
     */
    public class ForkJoinWorkerThread extends Thread {
        /*
         * ForkJoinWorkerThreads are managed by ForkJoinPools and perform
         * ForkJoinTasks. For explanation, see the internal documentation
         * of class ForkJoinPool.
         *
         * This class just maintains links to its pool and WorkQueue.  The
         * pool field is set immediately upon construction, but the
         * workQueue field is not set until a call to registerWorker
         * completes. This leads to a visibility race, that is tolerated
         * by requiring that the workQueue field is only accessed by the
         * owning thread.
         *
         * Support for (non-public) subclass InnocuousForkJoinWorkerThread
         * requires that we break quite a lot of encapsulation (via Unsafe)
         * both here and in the subclass to access and set Thread fields.
         */
    
        final ForkJoinPool pool;                // the pool this thread works in
        final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
    }
    

    工作队列(WorkQueue)

    为啥要双端队列呢?因为ForkJoinPool有一个机制,当某个工作线程对应消费的任务队列空闲的时候它会去别的忙的任务队列的尾部分担(stealing)任务过来执行(好伙伴啊)。然后那个忙的任务队列还是头部出任务给它对应的工作线程消费。这样双端就井然有序,不会有任务争抢的情况。看,这就是朴素的大师级的设计思想啊。

    以下代码片段示例了WorkQueue类中定义的一些重要属性:

    static final class WorkQueue {
        ......
        // 队列状态
        volatile int qlock;        // 1: locked, < 0: terminate; else 0
        // 下一个出队元素的索引位(主要是为线程窃取准备的索引位置)
        volatile int base;         // index of next slot for poll
        // 下一个入队元素准备的索引位
        int top;                   // index of next slot for push
        // 队列中使用数组存储元素
        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
        // 队列所属的ForkJoinPool(可能为空)
        // 注意,一个ForkJoinPool中会有多个执行线程,还会有比执行线程更多的(或一样多的)队列
        final ForkJoinPool pool;   // the containing pool (may be null)
        // 这个队列所属的归并计算工作线程。注意,工作队列也可能不属于任何工作线程
        final ForkJoinWorkerThread owner; // owning thread or null if shared
        // 记录当前正在进行join等待的其它任务
        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
        // 当前正在偷取的任务
        volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
        ......
    }
    

    入队

    当工作队列的 owner ,也就是ForkJoinWorkerThread 需要向双端队列中放入一个新的待执行子任务时,会调用WorkQueue中的push方法。

            /**
             * Pushes a task. Call only by owner in unshared queues.  (The
             * shared-queue version is embedded in method externalPush.)
             *
             * @param task the task. Caller must ensure non-null.
             * @throws RejectedExecutionException if array cannot be resized
             */
            final void push(ForkJoinTask<?> task) {
                ForkJoinTask<?>[] a; ForkJoinPool p;
    // base: 下一个出队元素的索引位(主要是为线程窃取准备的索引位置)
    // top: 下一个入队元素准备的索引位
                int b = base, s = top, n;
                if ((a = array) != null) {    // ignore if queue removed
                    int m = a.length - 1;     // fenced write for task visibility
    // 在指定的对象 ForkJoinTask<?>[] a 中,指定的内存偏移量 ((m & s) << ASHIFT) + ABASE 的位置,赋予一个新的元素
                    U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
    // 将workQueue对象本身中的top标示的位置 + 1
                    U.putOrderedInt(this, QTOP, s + 1);
                    if ((n = s - b) <= 1) {
                        if ((p = pool) != null)
                            p.signalWork(p.workQueues, this);
                    }
                    else if (n >= m)
                        growArray();
                }
            }
    
    

    这里使用Unsafe.putOrderedObject方法,这个方法在对低延迟代码是很有用的,它能够实现非堵塞的写入,这些写入不会被Java的JIT重新排序指令(instruction reordering),
    这样它使用快速的存储-存储(store-store) barrier,
    而不是较慢的存储-加载(store-load) barrier, (用在volatile的写操作上)
    这种性能提升是有代价的,虽然便宜,也就是写后结果并不会被其他线程看到,甚至是自己的线程,通常是几纳秒后被其他线程看到,这个时间比较短,所以代价可以忍受。

    类似Unsafe.putOrderedObject还有unsafe.putOrderedLong等方法,unsafe.putOrderedLong比使用 volatile long要快3倍左右。

    sun.misc.Unsafe操作类直接基于操作系统控制层在硬件层面上进行原子操作,它是ForkJoinPool高效性能的一大保证,类似的编程思路还体现在java.util.concurrent包中相当规模的类功能实现中。实际上sun.misc.Unsafe操作类在Java中有着举足轻重的地位,例如基于这个类实现的Java乐观锁机制。

    出队

    当ForkJoinWorkerThread需要从双端队列中取出下一个待执行子任务,就会根据设定的asyncMode调用双端队列的不同方法,代码概要如下所示:

    
        /**
         * If the current thread is operating in a ForkJoinPool,
         * unschedules and returns, without executing, the next task
         * queued by the current thread but not yet executed, if one is
         * available, or if not available, a task that was forked by some
         * other thread, if available. Availability may be transient, so a
         * {@code null} result does not necessarily imply quiescence of
         * the pool this task is operating in.  This method is designed
         * primarily to support extensions, and is unlikely to be useful
         * otherwise.
         *
         * @return a task, or {@code null} if none are available
         */
        protected static ForkJoinTask<?> pollTask() {
            Thread t; ForkJoinWorkerThread wt;
            return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
                null;
        }
    
        /**
         * Gets and removes a local or stolen task for the given worker.
         *
         * @return a task, if available
         */
        final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
            for (ForkJoinTask<?> t;;) {
                WorkQueue q; int b;
                if ((t = w.nextLocalTask()) != null)
                    return t;
                if ((q = findNonEmptyStealQueue()) == null)
                    return null;
                if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
                    return t;
            }
        }
    
    
            /**
             * Takes next task, if one exists, in order specified by mode.
             */
            final ForkJoinTask<?> nextLocalTask() {
                return (config & FIFO_QUEUE) == 0 ? pop() : poll();
            }
    
        // Specialized scanning
    
        /**
         * Returns a (probably) non-empty steal queue, if one is found
         * during a scan, else null.  This method must be retried by
         * caller if, by the time it tries to use the queue, it is empty.
         */
        private WorkQueue findNonEmptyStealQueue() {
            WorkQueue[] ws; int m;  // one-shot version of scan loop
            int r = ThreadLocalRandom.nextSecondarySeed();
            if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
                for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
                    WorkQueue q; int b;
                    if ((q = ws[k]) != null) {
                        if ((b = q.base) - q.top < 0)
                            return q;
                        checkSum += b;
                    }
                    if ((k = (k + 1) & m) == origin) {
                        if (oldSum == (oldSum = checkSum))
                            break;
                        checkSum = 0;
                    }
                }
            }
            return null;
        }
    

    工作窃取算法(work-stealing)

    ForkJoinPool 的核心特性是它使用了work-stealing(工作窃取)算法:线程池内的所有工作线程都尝试找到并执行已经提交的任务,或者是被其他活动任务创建的子任务(如果不存在就阻塞等待)。

    这种特性使得 ForkJoinPool 在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。尤其是构建异步模型的 ForkJoinPool 时,对不需要合并(join)的事件类型任务也非常适用。

    在 ForkJoinPool 中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数 mode 决定),然后以FIFO的顺序随机窃取其他队列中的任务。


    ForkJoinPool 中的任务分为两种:一种是本地提交的任务(Submission task,如 execute、submit 提交的任务);另外一种是 fork 出的子任务(Worker task)。两种任务都会存放在 WorkQueue 数组中,但是这两种任务并不会混合在同一个队列里,ForkJoinPool 内部使用了一种随机哈希算法(有点类似 ConcurrentHashMap 的桶随机算法)将工作队列与对应的工作线程关联起来,Submission 任务存放在 WorkQueue 数组的偶数索引位置,Worker 任务存放在奇数索引位。实质上,Submission 与 Worker 一样,只不过他它们被限制只能执行它们提交的本地任务,在后面的源码解析中,我们统一称之为“Worker”。
    任务的分布情况如下图:

    两种策略:
    Helping-帮助运行:如果偷取还未开始,为这些 joiner 安排一些它可以执行的其它任务。
    Compensating-补偿运行:如果没有足够的活动线程,tryCompensate()可能创建或重新激活一个备用的线程来为被阻塞的 joiner 补偿运行。
    第三种形式(在方法 tryRemoveAndExec 中实现)相当于帮助一个假想的补偿线程来运行任务:如果补偿线程窃取并执行的是被join的任务,那么 join 线程不需要补偿线程就可以直接执行它(尽管牺牲了更大的运行时堆栈,但这种权衡通常是值得的)。设想一下这种互相帮助的场景:补偿线程帮助 join 线程执行任务,反过来 join 线程也会帮助补偿线程执行任务。

    helpStealer(补偿执行)使用了一种“线性帮助(linear helping)”的算法。每个工作线程都记录了最近一个从其他工作队列(或 submission 队列)偷取过来的任务("currentSteal"引用),同样也记录了当前被 join 的任务(currentJoin 引用)。helpStealer 方法使用这些标记去尝试找到偷取者并帮助它执行任务,(也就是说,从偷取任务中拿到任务并执行,“偷取者偷我的任务执行,我去偷偷取者的任务执行”),这样就可以加速任务的执行。这种算法在 ForkJoinPool 中的大概实现方式如下:

    从 worker 到 steal 之间我们只保存依赖关系,而不是记录每个 steal 任务。有时可能需要对 workQueues 进行线性扫描来定位偷取者,但是一般不需要,因为偷取者在偷取任务时会把他的索引存放在在 hint 引用里。一个 worker 可能进行了多个偷取操作,但只记录了其中一个偷取者的索引(通常是最近的那个),为了节省开销,hint 在需要时才会记录。
    它是相对“浅层的”,忽略了嵌套和可能发生的循环相互偷取。
    "currentJoin"引用只有在 join 的时候被更新,这意味着我们在执行生命周期比较长的任务时会丢失链接,导致GC停转(在这种情况下利用阻塞通常是一个好的方案)。
    我们使用 checksum 限制查找任务的次数,然后挂起工作线程,必要时使用其他工作线程替换它。
    注意:CountedCompleter 的帮助动作不需要追踪"currentJoin":helpComplete 方法会获取并执行在同一个父节点下等待的所有任务。不过,这仍然需要对 completer 的链表进行遍历,所以使用 CountedCompleters 相对于直接定位"currentJoin"效率要低。

    补偿执行的目的不在于在任何给定的时间内保持未阻塞线程的目标并行数。这个类之前的版本为所有阻塞的join任务都提供即时补偿,然而,在实践中,绝大多数阻塞都是由GC和其他JVM或OS活动产生的瞬时的附带影响,这种情况下使用补偿线程替换工作线程会使情况变得更糟。现在,通过检查 WorkQueue.scanState 的状态确认所有活动线程都正在运行,然后使用补偿操作消除多余的活跃线程数。补偿操作通常情况下是被忽略运行的(容忍少数线程),因为它带来的利益很少:当一个有着空等待队列的工作线程在 join 时阻塞,它仍然会有足够的线程来保证活性,所以不需要进行补偿。补偿机制可能是有界限的。commonPool的界限(commonMaxSpares)使 JVM 在资源耗尽之前能更好的处理程序错误和资源滥用。用户可能通过自定义工厂来限制线程的构造,所以界限的作用在这种 pool 中是不精确的。当线程撤销(deregister)时,工作线程的总数会随之减少,不用等到他们退出并且资源被 JVM 和 OS 回收时才减少工作线程数,所以活跃线程在此瞬间可能会超过界限。

    简单应用

    这样分治思想用递归实现的经典案例就是斐波那契数列了。

    斐波那契数列:1、1、2、3、5、8、13、21、34、……
    公式 :F(1)=1,F(2)=1, F(n)=F(n-1)+F(n-2)(n>=3,n∈N*)

    源代码:

    package i
    
    import java.util.concurrent.ForkJoinPool
    import java.util.concurrent.ForkJoinTask
    import java.util.concurrent.RecursiveTask
    
    /**
     * @author: Jack
     * 2020-05-03 16:08
     * 斐波那契数列 计算第20个斐波那契数列
     * 以递归的方法定义:F(1)=1,F(2)=1, F(n)=F(n-1)+F(n-2)(n>=2,n∈N*)
     */
    
    
    class FibonacciComputation(var n: Int) : RecursiveTask<Long>() {
        override fun compute(): Long {
            return when {
                n <= 1 -> 1L
                n == 2 -> 1L
                else -> {
                    val f1 = FibonacciComputation(n - 1)
                    val f2 = FibonacciComputation(n - 2)
                    ForkJoinTask.invokeAll(f1, f2)
                    Thread.sleep(10)
                    f1.join() + f2.join()
                }
            }
        }
    }
    
    fun main() {
    
        val n = 10
    
        run {
            val pool = ForkJoinPool.commonPool()
    
            val s = System.currentTimeMillis()
            val fn = pool.invoke(FibonacciComputation(n))
            val t = System.currentTimeMillis()
    
            println(pool)
            println("fn=$fn")
            println("Time=${t - s}ms")
        }
    
        run {
            val s = System.currentTimeMillis()
            val fn = fib(n)
            val t = System.currentTimeMillis()
            println("fn=$fn")
            println("Time=${t - s}ms")
        }
    
    }
    
    
    fun fib(n: Int): Long {
        return when {
            n <= 1 -> 1L
            n == 2 -> 1L
            else -> {
                val f1 = fib(n - 1)
                val f2 = fib(n - 2)
                // 为了模拟计算密集型任务,我们在这里sleep 10ms
                Thread.sleep(10)
                f2 + f1
            }
        }
    }
    
    

    运行测试的结果:

    java.util.concurrent.ForkJoinPool@4b1210ee[Running, parallelism = 11, size = 11, active = 0, running = 0, steals = 24, tasks = 0, submissions = 0]
    fn=55
    Time=116ms
    fn=55
    Time=603ms
    

    可以发现在有CPU密集计算任务的场景的时候,并行计算框架FJ表现的性能非常棒。

    机器环境:

    硬件概览:
    
      型号名称: MacBook Pro
      型号标识符:    MacBookPro15,1
      处理器名称:    Intel Core i7
      处理器速度:    2.6 GHz
      处理器数目:    1
      核总数:  6
      L2 缓存(每个核):   256 KB
      L3 缓存:    12 MB
      超线程技术:    已启用
      内存:   16 GB
      Boot ROM 版本:  220.260.171.0.0 (iBridge: 16.16.5200.0.0,0)
      序列号(系统):  C02Z43JXLVCF
      硬件 UUID:  FAAEE2DB-8F7C-54B1-A0B7-F286C27EA35F
    
    

    另外,我们再举一个计算大量元素数组元素的例子:

    package i
    
    import java.util.concurrent.ForkJoinPool
    import java.util.concurrent.ForkJoinTask
    import java.util.concurrent.RecursiveTask
    import java.util.stream.LongStream
    
    /**
     * @author: Jack
     * 2020-05-03 16:08
     * 斐波那契数列 计算第20个斐波那契数列
     * 以递归的方法定义:F(1)=1,F(2)=1, F(n)=F(n-1)+F(n-2)(n>=2,n∈N*)
     */
    
    
    class Calculator(var numbers: LongArray, var start: Int, var end: Int) : RecursiveTask<Long>() {
        override fun compute(): Long {
            // 当需要计算的数字个数小于6时,直接采用for loop方式计算结果
            if (end - start < 6) {
                var sum = 0L
                for (i in start..end) {
                    sum += numbers[i]
                    Thread.sleep(10)
                }
                return sum
            }
    
            // 把任务一分为二,递归拆分(注意此处有递归)到底拆分成多少分 需要根据具体情况而
            val middle = (start + end) / 2
            val left = Calculator(numbers, start, middle)
            val right = Calculator(numbers, middle + 1, end)
            ForkJoinTask.invokeAll(left, right)
            // 为了模拟计算密集型任务,我们在这里sleep 10ms
            return left.join() + right.join()
        }
    }
    
    fun sum(numbers: LongArray): Long {
        var sum = 0L
        for (i in numbers) {
            sum += i
            // 为了模拟计算密集型任务,我们在这里sleep 10ms
            Thread.sleep(10)
        }
        return sum
    }
    
    
    fun main() {
    
        val numbers = LongStream.rangeClosed(1, 100).toArray()
    
        run {
            val calculator = Calculator(numbers, 0, numbers.size - 1)
            val pool = ForkJoinPool.commonPool()
    
            val s = System.currentTimeMillis()
            val fn = pool.invoke(calculator)
            val t = System.currentTimeMillis()
    
            println(pool)
            println("fn=$fn")
            println("Time=${t - s}ms")
        }
    
        run {
            val s = System.currentTimeMillis()
            val fn = sum(numbers)
            val t = System.currentTimeMillis()
    
            println("fn=$fn")
            println("Time=${t - s}ms")
        }
    
    }
    
    

    运行结果:

    java.util.concurrent.ForkJoinPool@3941a79c[Running, parallelism = 11, size = 11, active = 0, running = 0, steals = 17, tasks = 0, submissions = 0]
    fn=5050
    Time=137ms
    fn=5050
    Time=1169ms
    

    参考链接

    https://www.jianshu.com/p/32a15ef2f1bf
    https://www.jianshu.com/p/9ca3a95cb61a
    https://blog.csdn.net/tyrroo/article/details/81483608
    https://www.jianshu.com/p/32a15ef2f1bf
    http://gee.cs.oswego.edu/dl/papers/fj.pdf
    https://segmentfault.com/a/1190000019635250


    Kotlin开发者社区

    专注分享 Java、 Kotlin、Spring/Spring Boot、MySQL、redis、neo4j、NoSQL、Android、JavaScript、React、Node、函数式编程、编程思想、"高可用,高性能,高实时"大型分布式系统架构设计主题。

    High availability, high performance, high real-time large-scale distributed system architecture design

    分布式框架:Zookeeper、分布式中间件框架等
    分布式存储:GridFS、FastDFS、TFS、MemCache、redis等
    分布式数据库:Cobar、tddl、Amoeba、Mycat
    云计算、大数据、AI算法
    虚拟化、云原生技术
    分布式计算框架:MapReduce、Hadoop、Storm、Flink等
    分布式通信机制:Dubbo、RPC调用、共享远程数据、消息队列等
    消息队列MQ:Kafka、MetaQ,RocketMQ
    怎样打造高可用系统:基于硬件、软件中间件、系统架构等一些典型方案的实现:HAProxy、基于Corosync+Pacemaker的高可用集群套件中间件系统
    Mycat架构分布式演进
    大数据Join背后的难题:数据、网络、内存和计算能力的矛盾和调和
    Java分布式系统中的高性能难题:AIO,NIO,Netty还是自己开发框架?
    高性能事件派发机制:线程池模型、Disruptor模型等等。。。

    合抱之木,生于毫末;九层之台,起于垒土;千里之行,始于足下。不积跬步,无以至千里;不积小流,无以成江河。

    相关文章

      网友评论

        本文标题:JUC并行计算框架 Fork/Join 原理图文详解&代码示例

        本文链接:https://www.haomeiwen.com/subject/jngdghtx.html