美文网首页
Fork/Join框架分析

Fork/Join框架分析

作者: tracy_668 | 来源:发表于2023-01-14 14:56 被阅读0次

    [TOC]

    Fork/Join框架基本使用

    java.util.concurrent.ForkJoinPool由Java大师Doug Lea主持编写,它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。本文中对Fork/Join框架的讲解,基于JDK1.8+中的Fork/Join框架实现,参考的Fork/Join框架主要源代码也基于JDK1.8+。

    这里是一个简单的Fork/Join框架使用示例,在这个示例中我们计算了1-1001累加后的值:

    /**
     * 这是一个简单的Join/Fork计算过程,将1—1001数字相加
     */
    public class TestForkJoinPool {
    
        private static final Integer MAX = 200;
    
        static class MyForkJoinTask extends RecursiveTask<Integer> {
            // 子任务开始计算的值
            private Integer startValue;
    
            // 子任务结束计算的值
            private Integer endValue;
    
            public MyForkJoinTask(Integer startValue , Integer endValue) {
                this.startValue = startValue;
                this.endValue = endValue;
            }
    
            @Override
            protected Integer compute() {
                // 如果条件成立,说明这个任务所需要计算的数值分为足够小了
                // 可以正式进行累加计算了
                if(endValue - startValue < MAX) {
                    System.out.println("开始计算的部分:startValue = " + startValue + ";endValue = " + endValue);
                    Integer totalValue = 0;
                    for(int index = this.startValue ; index <= this.endValue  ; index++) {
                        totalValue += index;
                    }
                    return totalValue;
                }
                // 否则再进行任务拆分,拆分成两个任务
                else {
                    MyForkJoinTask subTask1 = new MyForkJoinTask(startValue, (startValue + endValue) / 2);
                    subTask1.fork();
                    MyForkJoinTask subTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1 , endValue);
                    subTask2.fork();
                    return subTask1.join() + subTask2.join();
                }
            }
        }
    
        public static void main(String[] args) {
            // 这是Fork/Join框架的线程池
            ForkJoinPool pool = new ForkJoinPool();
            ForkJoinTask<Integer> taskFuture =  pool.submit(new MyForkJoinTask(1,1001));
            try {
                Integer result = taskFuture.get();
                System.out.println("result = " + result);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace(System.out);
            }
        }
    }
    

    以上代码很简单,在关键的位置有相关的注释说明。这里本文再对以上示例中的要点进行说明。首先看看以上示例代码的可能执行结果:

    开始计算的部分:startValue = 1;endValue = 126
    开始计算的部分:startValue = 127;endValue = 251
    开始计算的部分:startValue = 252;endValue = 376
    开始计算的部分:startValue = 377;endValue = 501
    开始计算的部分:startValue = 502;endValue = 626
    开始计算的部分:startValue = 627;endValue = 751
    开始计算的部分:startValue = 752;endValue = 876
    开始计算的部分:startValue = 877;endValue = 1001
    result = 501501
    

    工作顺序图

    下图展示了以上代码的工作过程概要,但实际上Fork/Join框架的内部工作过程要比这张图复杂得多,例如如何决定某一个recursive task是使用哪条线程进行运行;再例如如何决定当一个任务/子任务提交到Fork/Join框架内部后,是创建一个新的线程去运行还是让它进行队列等待。

    所以如果不深入理解Fork/Join框架的运行原理,只是根据之上最简单的使用例子观察运行效果,那么我们只能知道子任务在Fork/Join框架中被拆分得足够小后,并且其内部使用多线程并行完成这些小任务的计算后再进行结果向上的合并动作,最终形成顶层结果。不急,一步一步来,我们先从这张概要的过程图开始讨论。

    image.png

    图中最顶层的任务使用submit方式被提交到Fork/Join框架中,后者将前者放入到某个线程中运行,工作任务中的compute方法的代码开始对这个任务T1进行分析。如果当前任务需要累加的数字范围过大(代码中设定的是大于200),则将这个计算任务拆分成两个子任务(T1.1和T1.2),每个子任务各自负责计算一半的数据累加,请参见代码中的fork方法。如果当前子任务中需要累加的数字范围足够小(小于等于200),就进行累加然后返回到上层任务中。

    ForkJoinPool构造函数

    ForkJoinPool有四个构造函数,其中参数最全的那个构造函数如下所示:

    public ForkJoinPool(int parallelism,
                            ForkJoinWorkerThreadFactory factory,
                            UncaughtExceptionHandler handler,
                            boolean asyncMode)
    
    • parallelism:可并行级别,Fork/Join框架将依据这个并行级别的设定,决定框架内并行执行的线程数量。并行的每一个任务都会有一个线程进行处理,但是千万不要将这个属性理解成Fork/Join框架中最多存在的线程数量,也不要将这个属性和ThreadPoolExecutor线程池中的corePoolSize、maximumPoolSize属性进行比较,因为ForkJoinPool的组织结构和工作方式与后者完全不一样。而后续的讨论中,读者还可以发现Fork/Join框架中可存在的线程数量和这个参数值的关系并不是绝对的关联(有依据但并不全由它决定)。

    • factory:当Fork/Join框架创建一个新的线程时,同样会用到线程创建工厂。只不过这个线程工厂不再需要实现ThreadFactory接口,而是需要实现ForkJoinWorkerThreadFactory接口。后者是一个函数式接口,只需要实现一个名叫newThread的方法。在Fork/Join框架中有一个默认的ForkJoinWorkerThreadFactory接口实现:DefaultForkJoinWorkerThreadFactory。

    • handler:异常捕获处理器。当执行的任务中出现异常,并从任务中被抛出时,就会被handler捕获。

    • asyncMode:这个参数也非常重要,从字面意思来看是指的异步模式,它并不是说Fork/Join框架是采用同步模式还是采用异步模式工作。Fork/Join框架中为每一个独立工作的线程准备了对应的待执行任务队列,这个任务队列是使用数组进行组合的双向队列。即是说存在于队列中的待执行任务,即可以使用先进先出的工作模式,也可以使用后进先出的工作模式。

    当asyncMode设置为ture的时候,队列采用先进先出方式工作;反之则是采用后进先出的方式工作,该值默认为false:

    ......
    asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
    ......
    

    ForkJoinPool还有另外两个构造函数,一个构造函数只带有parallelism参数,既是可以设定Fork/Join框架的最大并行任务数量;另一个构造函数则不带有任何参数,对于最大并行任务数量也只是一个默认值——当前操作系统可以使用的CPU内核数量(Runtime.getRuntime().availableProcessors())。实际上ForkJoinPool还有一个私有的、原生构造函数,之上提到的三个构造函数都是对这个私有的、原生构造函数的调用。

    ......
    private ForkJoinPool(int parallelism,
                             ForkJoinWorkerThreadFactory factory,
                             UncaughtExceptionHandler handler,
                             int mode,
                             String workerNamePrefix) {
            this.workerNamePrefix = workerNamePrefix;
            this.factory = factory;
            this.ueh = handler;
            this.config = (parallelism & SMASK) | mode;
            long np = (long)(-parallelism); // offset ctl counts
            this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
        }
    ......
    

    如果你对Fork/Join框架没有特定的执行要求,可以直接使用不带有任何参数的构造函数。也就是说推荐基于当前操作系统可以使用的CPU内核数作为Fork/Join框架内最大并行任务数量,这样可以保证CPU在处理并行任务时,尽量少发生任务线程间的运行状态切换(实际上单个CPU内核上的线程间状态切换基本上无法避免,因为操作系统同时运行多个线程和多个进程)。

    fork方法和join方法

    Fork/Join框架中提供的fork方法和join方法,可以说是该框架中提供的最重要的两个方法,它们和parallelism“可并行任务数量”配合工作,可以导致拆分的子任务T1.1、T1.2甚至TX在Fork/Join框架中不同的运行效果。例如TX子任务或等待其它已存在的线程运行关联的子任务,或在运行TX的线程中“递归”执行其它任务,又或者启动一个新的线程运行子任务……

    fork方法用于将新创建的子任务放入当前线程的work queue队列中,Fork/Join框架将根据当前正在并发执行ForkJoinTask任务的ForkJoinWorkerThread线程状态,决定是让这个任务在队列中等待,还是创建一个新的ForkJoinWorkerThread线程运行它,又或者是唤起其它正在等待任务的ForkJoinWorkerThread线程运行它。

    这里面有几个元素概念需要注意,ForkJoinTask任务是一种能在Fork/Join框架中运行的特定任务,也只有这种类型的任务可以在Fork/Join框架中被拆分运行和合并运行。ForkJoinWorkerThread线程是一种在Fork/Join框架中运行的特性线程,它除了具有普通线程的特性外,最主要的特点是每一个ForkJoinWorkerThread线程都具有一个独立的任务等待队列(work queue),这个任务队列用于存储在本线程中被拆分的若干子任务。

    image.png

    join方法用于让当前线程阻塞,直到对应的子任务完成运行并返回执行结果。或者,如果这个子任务存在于当前线程的任务等待队列(work queue)中,则取出这个子任务进行“递归”执行。其目的是尽快得到当前子任务的运行结果,然后继续执行。

    使用归并算法解决排序问题

    排序问题是我们工作中的常见问题。目前也有很多现成算法是为了解决这个问题而被发明的,例如多种插值排序算法、多种交换排序算法。而并归排序算法是目前所有排序算法中,平均时间复杂度较好(O(nlgn)),算法稳定性较好的一种排序算法。它的核心算法思路将大的问题分解成多个小问题,并将结果进行合并。

    image.png

    整个算法的拆分阶段,是将未排序的数字集合,从一个较大集合递归拆分成若干较小的集合,这些较小的集合要么包含最多两个元素,要么就认为不够小需要继续进行拆分。

    那么对于一个集合中元素的排序问题就变成了两个问题:1、较小集合中最多两个元素的大小排序;2、如何将两个有序集合合并成一个新的有序集合。第一个问题很好解决,那么第二个问题是否会很复杂呢?实际上第二个问题也很简单,只需要将两个集合同时进行一次遍历即可完成——比较当前集合中最小的元素,将最小元素放入新的集合,它的时间复杂度为O(n):

    以下是归并排序算法的简单实现:

    import java.util.Arrays;
    import java.util.Random;
    
    /**
     * 归并排序
     */
    public class Merge1 {
    
        private static int MAX = 10000;
    
        private static int inits[] = new int[MAX];
    
        // 这是为了生成一个数量为MAX的随机整数集合,准备计算数据
        // 和算法本身并没有什么关系
        static {
            Random r = new Random();
            for(int index = 1 ; index <= MAX ; index++) {
                inits[index - 1] = r.nextInt(10000000);
            }
        }
    
        public static void main(String[] args) {
            long beginTime = System.currentTimeMillis();
            int results[] = forkits(inits); 
            long endTime = System.currentTimeMillis();
            // 如果参与排序的数据非常庞大,记得把这种打印方式去掉
            System.out.println("耗时=" + (endTime - beginTime) + " | " + Arrays.toString(results));       
        }
    
        // 拆分成较小的元素或者进行足够小的元素集合的排序
        private static int[] forkits(int source[]) {
            int sourceLen = source.length;
            if(sourceLen > 2) {
                int midIndex = sourceLen / 2;
                int result1[] = forkits(Arrays.copyOf(source, midIndex));
                int result2[] = forkits(Arrays.copyOfRange(source, midIndex , sourceLen));
                // 将两个有序的数组,合并成一个有序的数组
                int mer[] = joinInts(result1 , result2);
                return mer;
            } 
            // 否则说明集合中只有一个或者两个元素,可以进行这两个元素的比较排序了
            else {
                // 如果条件成立,说明数组中只有一个元素,或者是数组中的元素都已经排列好位置了
                if(sourceLen == 1
                    || source[0] <= source[1]) {
                    return source;
                } else {
                    int targetp[] = new int[sourceLen];
                    targetp[0] = source[1];
                    targetp[1] = source[0];
                    return targetp;
                }
            }
        }
    
        /**
         * 这个方法用于合并两个有序集合
         * @param array1
         * @param array2
         */
        private static int[] joinInts(int array1[] , int array2[]) {
            int destInts[] = new int[array1.length + array2.length];
            int array1Len = array1.length;
            int array2Len = array2.length;
            int destLen = destInts.length;
    
            // 只需要以新的集合destInts的长度为标准,遍历一次即可
            for(int index = 0 , array1Index = 0 , array2Index = 0 ; index < destLen ; index++) {
                int value1 = array1Index >= array1Len?Integer.MAX_VALUE:array1[array1Index];
                int value2 = array2Index >= array2Len?Integer.MAX_VALUE:array2[array2Index];
                // 如果条件成立,说明应该取数组array1中的值
                if(value1 < value2) {
                    array1Index++;
                    destInts[index] = value1;
                }
                // 否则取数组array2中的值
                else {
                    array2Index++;
                    destInts[index] = value2;
                }
            }
    
            return destInts;
        }
    }
    

    以上归并算法对1万条随机数进行排序只需要2-3毫秒,对10万条随机数进行排序只需要20毫秒左右的时间,对100万条随机数进行排序的平均时间大约为160毫秒(这还要看随机生成的待排序数组是否本身的凌乱程度)。可见归并算法本身是具有良好的性能的。使用JMX工具和操作系统自带的CPU监控器监视应用程序的执行情况,可以发现整个算法是单线程运行的,且同一时间CPU只有单个内核在作为主要的处理内核工作:

    使用Fork/Join运行归并算法

    但是随着待排序集合中数据规模继续增大,以上归并算法的代码实现就有一些力不从心了,例如以上算法对1亿条随机数集合进行排序时,耗时为27秒左右。

    接着我们可以使用Fork/Join框架来优化归并算法的执行性能,将拆分后的子任务实例化成多个ForkJoinTask任务放入待执行队列,并由Fork/Join框架在多个ForkJoinWorkerThread线程间调度这些任务。如下图所示:

    以下为使用Fork/Join框架后的归并算法代码,请注意joinInts方法中对两个有序集合合并成一个新的有序集合的代码,

    
    

    使用Fork/Join框架优化后,同样执行1亿条随机数的排序处理时间大约在14秒左右,当然这还和待排序集合本身的凌乱程度、CPU性能等有关系。但总体上这样的方式比不使用Fork/Join框架的归并排序算法在性能上有30%左右的性能提升。以下为执行时观察到的CPU状态和线程状态:

    除了归并算法代码实现内部可优化的细节处,使用Fork/Join框架后,我们基本上在保证操作系统线程规模的情况下,将每一个CPU内核的运算资源同时发挥了出来。

    Fork/Join Pool实例化

    实际上在之前文章中给出的Fork/Join Pool使用实例中,我们使用的new ForkJoinPool()或者new ForkJoinPool(N)这些方式来进行操作,这并不是ForkJoinPool作者Doug Lea推荐的使用方式。在ForkJoinPool主类的注释说明中,有这样一句话:

    A static commonPool() is available and appropriate for most applications. The common pool is used by any ForkJoinTask that is not explicitly submitted to a specified pool.
    Using the common pool normally reduces resource usage (its threads are slowly reclaimed during periods of non-use, and reinstated upon subsequent use).

    以上描述大致的中文解释是:ForkJoinPools类有一个静态方法commonPool(),这个静态方法所获得的ForkJoinPools实例是由整个应用进程共享的,并且它适合绝大多数的应用系统场景。使用commonPool通常可以帮助应用程序中多种需要进行归并计算的任务共享计算资源,从而使后者发挥最大作用(ForkJoinPools中的工作线程在闲置时会被缓慢回收,并在随后需要使用时被恢复),而这种获取ForkJoinPools实例的方式,才是Doug Lea推荐的使用方式。代码如下:

    ......
    ForkJoinPool commonPool =  ForkJoinPool.commonPool();
    ......
    

    工作线程和工作队列

    在本小节中我们主要讨论ForkJoinPool中处理ForkJoinTask任务及其子任务的情况,而ForkJoinPool处理Runnable或者Callable类型任务的情况将在后文讨论。ForkJoinPool中主要的工作线程,采用ForkJoinWorkerThread定义,其中有两个主要属性pool和workQueue:

    public class ForkJoinWorkerThread extends Thread {
        ......
        // the pool this thread works in
        final ForkJoinPool pool;
        // work-stealing mechanics
        final ForkJoinPool.WorkQueue workQueue;
        ......
    }
    

    pool属性表示这个进行归并计算的线程所属的ForkJoinPool实例,workQueue属性是java.util.concurrent.ForkJoinPool.WorkQueue这个类的实例,它表示这个线程所使用的子任务待执行队列,而且可以被其它工作线程偷取任务。后者的内部是一个数组结构,并使用一些关键属性记录这个队列的实时状态,更具体的来说这个WorkQueue是一个双端队列

    Java中还有一组类似的双端队列顶层接口java.util.Deque、java.util.concurrent.BlockingDeque,但应该是出于实现细节的考虑,WorkQueue这个双端队列并没有实现这些接口。所谓双端队列,就是说队列中的元素(ForkJoinTask任务及其子任务)可以从一端入队出队,还可以从另一端入队出队。这个双端队列将用于支持ForkJoinPool的两种异步模型(asyncMode):后进先出(LIFO_QUEUE)和先进先出(FIFO_QUEUE)。以下代码片段示例了WorkQueue类中定义的一些重要属性:

    ......
    static final class WorkQueue {
        ......
        // 队列状态
        volatile int qlock;        // 1: locked, < 0: terminate; else 0
        // 下一个出队元素的索引位(主要是为线程窃取准备的索引位置)
        volatile int base;         // index of next slot for poll
        // 为下一个入队元素准备的索引位
        int top;                   // index of next slot for push
        // 队列中使用数组存储元素
        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
        // 队列所属的ForkJoinPool(可能为空)
        // 注意,一个ForkJoinPool中会有多个执行线程,还会有比执行线程更多的(或一样多的)队列
        final ForkJoinPool pool;   // the containing pool (may be null)
        // 这个队列所属的归并计算工作线程。注意,工作队列也可能不属于任何工作线程
        final ForkJoinWorkerThread owner; // owning thread or null if shared
        // 记录当前正在进行join等待的其它任务
        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
        // 当前正在偷取的任务
        volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
        ......
    }
    ......
    

    当ForkJoinWorkerThread需要向双端队列中放入一个新的待执行子任务时,会调用WorkQueue中的push方法。我们来看看这个方法的主要执行过程(请注意,源代码来自JDK1.8,它和JDK1.7中的实现有显著不同):

    /**
     * Pushes a task. Call only by owner in unshared queues.  (The
     * shared-queue version is embedded in method externalPush.)
     */
    final void push(ForkJoinTask<?> task) {
        ForkJoinTask<?>[] a; ForkJoinPool p;
        int b = base, s = top, n;
        // 请注意,在执行task.fork时,触发push情况下,array不会为null
        // 因为在这之前workqueue中的array已经完成了初始化(在工作线程初始化时就完成了)
        if ((a = array) != null) {
        int m = a.length - 1;     // fenced write for task visibility
        // U常量是java底层的sun.misc.Unsafe操作类
        // 这个类提供硬件级别的原子操作
        // putOrderedObject方法在指定的对象a中,指定的内存偏移量的位置,赋予一个新的元素
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        // putOrderedInt方法对当前指定的对象中的指定字段,进行赋值操作
        // 这里的代码意义是将workQueue对象本身中的top标示的位置 + 1,
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {
            if ((p = pool) != null)
            // Tries to create or activate a worker if too few are active.
            // signalWork方法的意义在于,在当前活动的工作线程过少的情况下,创建新的工作线程
            p.signalWork(p.workQueues, this);
        }
        // 如果array的剩余空间不够了,则进行增加
        else if (n >= m)
            growArray();
        }
    }
    

    sun.misc.Unsafe操作类直接基于操作系统控制层在硬件层面上进行原子操作,它是ForkJoinPool高效性能的一大保证,类似的编程思路还体现在java.util.concurrent包中相当规模的类功能实现中。实际上sun.misc.Unsafe操作类在Java中有着举足轻重的地位,当ForkJoinWorkerThread需要从双端队列中取出下一个待执行子任务,就会根据设定的asyncMode调用双端队列的不同方法,代码概要如下所示:

    // 试图从指定的队列中取出下一个待执行任务
    final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
        for (ForkJoinTask<?> t;;) {
            WorkQueue q; int b;
            // 该方法试图从“w”这个队列获取下一个待处理子任务
            if ((t = w.nextLocalTask()) != null)
                return t;
            // 如果没有获取到,则使用findNonEmptyStealQueue方法
            // 随机得到一个元素非空,并且可以进行任务窃取的存在于ForkJoinPool中的其它队列
            // 这个队列被记为“q”
            if ((q = findNonEmptyStealQueue()) == null)
                return null;
            // 试图从“q”这个队列base位处取出待执行任务
            if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
                return t;
        }
    }
    
    ......
    
    /**
     * Takes next task, if one exists, in order specified by mode.
     */
    final ForkJoinTask<?> nextLocalTask() {
        // 如果asyncMode设定为后进先出(LIFO)
        // 则使用pop()从双端队列的前端取出任务
        // 否则就是先进先出模式(LIFO),使用poll()从双端队列的后端取出任务
        return (config & FIFO_QUEUE) == 0 ? pop() : poll();
    }
    ......
    

    ForkJoinPool中的队列

    那么ForkJoinPool是怎样创建队列的呢?请看如下两段源代码片段:

    /**
     * Tries to add the given task to a submission queue at
     * submitter's current queue. Only the (vastly) most common path
     * is directly handled in this method, while screening for need
     * for externalSubmit.
     */
    // ForkJoinPool类中的方法
    // 该方法试图将一个任务提交到一个submission queue中,随机提交
    final void externalPush(ForkJoinTask<?> task) {
        WorkQueue[] ws; WorkQueue q; int m;
        // 取得一个随机探查数,可能为0也可能为其它数
        int r = ThreadLocalRandom.getProbe();
        // 获取当前ForkJoinPool的运行状态
        int rs = runState;
        // 最关键的操作在这里,详见后文说明
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {
    
            ForkJoinTask<?>[] a; int am, n, s;
            if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            int j = ((am & s) << ASHIFT) + ABASE;
            // 以下三个原子操作首先是将task放入队列
            U.putOrderedObject(a, j, task);
            // 然后将“q”这个submission queue的top标记+1
            U.putOrderedInt(q, QTOP, s + 1);
            // 最后解除这个submission queue的锁定状态
            U.putIntVolatile(q, QLOCK, 0);
    
            // 如果条件成立,说明这时处于active的工作线程可能还不够
            // 所以调用signalWork方法
            if (n <= 1)
                signalWork(ws, q);
            return;
            }
        // 这里试图接除对这个submission queue的锁定状态
        // 为什么会有两次接触呢?因为在之前代码中给队列加锁后,
        // 可能队列的现有空间并不满足添加新的task的条件
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
    
        externalSubmit(task);
    }
    
    ......
    
    /**
     * Full version of externalPush, handling uncommon cases, as well
     * as performing secondary initialization upon the first
     * submission of the first task to the pool.  It also detects
     * first submission by an external thread and creates a new shared
     * queue if the one at index if empty or contended.
     */
    // 以下是externalSubmit方法的部分代码,用于初始化ForkJoinPool中的队列
    private void externalSubmit(ForkJoinTask<?> task) {
        ......
        // initialize
        // 如果条件成立,就说明当前ForkJoinPool类中,还没有任何队列,所以要进行队列初始化
        else if ((rs & STARTED) == 0 ||  ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
            int ns = 0;
            rs = lockRunState();
            try {
                if ((rs & STARTED) == 0) {
                    // 通过原子操作,完成“任务窃取次数”这个计数器的初始化
                    U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong());
                    // create workQueues array with size a power of two
                    // 这段代码也非常有趣,详见后文的分析。
                    int p = config & SMASK; // ensure at least 2 slots
                    int n = (p > 1) ? p - 1 : 1;
                    n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                    n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                    workQueues = new WorkQueue[n];
                    ns = STARTED;
                }
            } finally {
                unlockRunState(rs, (rs & ~RSLOCK) | ns);
            }
        }
        ......
    }
    

    externalPush方法中的“q = ws[m & r & SQMASK]”代码非常重要。我们大致来分析一下作者的意图,首先m是ForkJoinPool中的WorkQueue数组长度减1,例如当前WorkQueue数组大小为16,那么m的值就为15;r是一个线程独立的随机数生成器,关于java.util.concurrent.ThreadLocalRandom类的功能和使用方式可参见其它资料;而SQMASK是一个常量,值为126 (0x7e)。以下是一种可能的计算过程和计算结果:

    image.png

    实际上任何数和126进行“与”运算,其结果只可能是0或者偶数,即0 、 2 、 4 、 6 、 8。也就是说以上代码中从名为“ws”的WorkQueue数组中,取出的元素只可能是第0个或者第偶数个队列。

    我们再来看看以上代码给出的externalSubmit方法中,进行WorkQueue数组初始化的代码。当外部调用这通过submit、execute、invoke方法向ForkJoinPool提交一个计算任务时,会运行这段代码为ForkJoinPool创建多个WorkQueue并形成数组。其中以下代码片段用于确定这个即将创建的WorkQueue数组的大小

    ......
    // SMASK是一个常量
    static final int SMASK = 0xffff;
    ......
    // 这是config的来源
    // mode是ForkJoinPool构造函数中设定的asyncMode,如果为LIFO,则mode为0,否则为65536
    // parallelism 为技术人员设置的(或者程序自行设定的)并发等级
    this.config = (parallelism & SMASK) | mode;
    ......
    // ensure at least 2 slots
    int p = config & SMASK;
    // n这个变量就是要计算的WorkQueue数组的大小
    int n = (p > 1) ? p - 1 : 1;
    ......
    n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
    n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
    ......
    

    从以上整理的代码可以看出,最后确认ForkJoinPool中WorkQueue数组初始化大小的因素是名叫config的变量,而config变量又与构造ForkJoinPool时所传入的并发等级(parallelism)、异步模式(asyncMode)有关。在我们选择LIFO模式时,计算结果如下表所示(下表中的结果建立在mode = 0的前提下):

    image.png

    是的,计算结果“n”按照两倍规模进行扩展,并且在初始化时保证和并发级别设定的数量(parallelism)至少两倍的关系。这是为什么呢?这是因为ForkJoinPool中的这些WorkQueue和工作线程ForkJoinWorkerThread并不是一对一的关系,而是随时都有多余ForkJoinWorkerThread数量的WorkQueue元素。而这个ForkJoinPool中的WorkQueue数组中,索引位为奇数的工作队列用于存储从外部提交到ForkJoinPool中的任务,也就是所谓的submissions queue;索引位为偶数的工作队列用于存储归并计算过程中等待处理的子任务,也就是task queue。

    image.png

    避免伪共享

    所谓伪共享是指多核CPU在无锁情况下同时抢占同一缓存行的写操作权限所引起的频繁切换操作。是不是不好理解这句话?那么我们就采用图文方式来说明一下伪共享产生的场景:


    image.png

    上图是目前典型的计算机硬件层组织结构(只画了北桥部分,南桥部分不在我们讨论范围内,就忽略了),主存和CPU之间由北桥芯片负责数据交换,北桥芯片和CPU间的通讯速度被称为前端总线速度(和芯片类型、总线频率等因素有关),虽然基于目前的技术基础,前端总线速度都比较快,但总的来说还是算比较珍贵的计算资源。CPU内部结构主要有计算内核、一级缓存(L1)、二级缓存(L2)和三级缓存(L3),其中L1缓存为每个计算内核所独享的,L2缓存是否由每个计算内核所独享视不同的CPU型号不同而有所区别,L3缓存一般为CPU内核锁共享。L3、L2、L1缓存的内部读写速度和制造成本依次递增,这就意味着它们的容量依次递减。以AMD 锐龙5 1400 这款CPU为例,这是一款4核民用级CPU,L1 数据缓存(L1 Data)大小为 4 × 32KBytes、L2 缓存 4 × 64KBytes、L3 缓存2 × 4MBytes。注意L1级缓存除了数据缓存以外还有独立的指令缓存(L1 Inst),另外AMD 锐龙5 1400这款CPU的L3缓存为每两核共享一组,所以会出现2 × 4MBytes的情况。

    为了避免CPU内核和相对缓慢的主存随时进行数据交互,CPU对在计算时对线程中数据的读写主要依靠这几级缓存进行,再通过前段总线将缓存中的数据和CPU数据进行同步。注意,真的只是相对而言的慢,DDR3/DDR4的内存速度还是很快的,DDR3-1600的读写速度可以达到 2.2 GB/s,而CPU L1缓存的数据读取速度可以达到 900 GB/s!

    CPU对于数据缓存的操作单位是缓存行,也就是说当进行CPU要对内存中的数据进行操作时,CPU会将内存地址和临近地址的数据全部装入缓存后,再进行操作(实际上现代计算机系统为了加快I/O性能,大量采用这样的操作原则,例如在“数据存储”专题讲到MySQL的InnoDB数据引擎从磁盘上读取数据到内存时,也是采用“Page”为单位进行的)。例如给出的CPU缓存行每一行都有64个字节(部分型号CPU的缓存行为32个字节),而一个长整型的大小为64位也就是8个字节,那么一个缓存行就可以容纳8个长整型(而实际上缓存行头部会使用8字节加载一些描述信息,所以实际上最多能存7个长整型)。

    如果工作在两个不同内核的两个线程(或者多个线程),需要对同一内存地址的数据进行写操作该怎么办呢?这里就要提到一个协议——MESI协议(可以参看https://en.wikipedia.org/wiki/MESI_protocol,或者这本书《What Every Programmer Should Know About Memory》)。简单的来说,如果CPU内核要对缓存行的数据进行写操作时,首先根据这个缓存行的状态,分析该缓存行的数据是否可能也存在于其它CPU内核的缓存行中(缓存行状态为S),如果有则发送一个RFO请求到其它CPU,让其它CPU设定缓存行的状态为Invalid(无效状态),这样就可以保证只有本CPU能够对缓存行的数据进行写操作。

    那么问题就很明显了,如果这些内核不停的相互请求对同一数据的写操作权限,就会出现资源抢占的情况,导致各个CPU内核L1和L2缓存全部失效,最后都不得不到L3甚至主存中重读数据。这都还算好比较好的情况,上文已经说过CPU对数据的操作以缓存行为单位,而一个缓存行可容纳多个数据(这里记为A数据和B数据),CPU内核1需要对数据A进行修改,同时CPU内核2需要对数据B进行修改。当这样的情况出现时,从技术人员角度来看好像并没有出现资源抢占,但实际上在多个CPU内核中发生的情况却发生了不必要的抢占。

    这就是伪共享,这个问题在Java中当然有解决办法,但是相对比较“暴力”。原理就是让A数据和B数据处于不同的缓存行——缓存行在存储A数据后多出来的空间,采用一些无用的基本数据进行“补全”。这样B数据就可以处于不同缓存行了,如下代码所示:

    ......
    // 这个是真实业务需要的属性,是一个长整型
    public volatile long a = 0L;
    // 需要补6个长整型
    public long a1, a2, a3, a4, a5, a6;
    
    //=================
    // 值得一提的是,有的CPU缓存航为64字节,所以为了兼容性更好一点
    // 你可以一直填充14个长整形
    public long a1, a2, a3, a4, a5, a6;
    public volatile long a = 0L;
    public long a7, a8, a9, a10, a11, a12, a13, a14;
    ......
    

    以上是Java代码的示例,是JDK1.7以及之前版本使用的一种解决方式。而JDK 1.8中有了一个更便利的解决方式,就是“@sun.misc.Contended”标记,而它的一个典型应用场景就在ForkJoinPool中:

    ....
    @sun.misc.Contended
    public class ForkJoinPool extends AbstractExecutorService {
        ....
    }
    ....
    

    ForkJoinPool工作监控

    ForkJoinPool重写了toString()方法,以便技术人员在代码调试或者其它需要临时监控ForkJoinPool运行情况的场景下,轻松获取ForkJoinPool中的主要工作状态。以下运行效果展示了ForkJoinPool类的toString()方法打印的情况:

    **********************
    @2503dbd3[Running, parallelism = 6, size = 9, active = 7, running = 7, steals = 14, tasks = 224, submissions = 0]
    **********************
    @2503dbd3[Running, parallelism = 6, size = 9, active = 7, running = 7, steals = 14, tasks = 213, submissions = 0]
    **********************
    @2503dbd3[Running, parallelism = 6, size = 9, active = 7, running = 7, steals = 14, tasks = 164, submissions = 0]
    **********************
    @2503dbd3[Running, parallelism = 6, size = 9, active = 9, running = 9, steals = 22, tasks = 281, submissions = 0]
    **********************
    @2503dbd3[Running, parallelism = 6, size = 9, active = 8, running = 8, steals = 22, tasks = 212, submissions = 0]
    **********************
    @2503dbd3[Running, parallelism = 6, size = 9, active = 7, running = 7, steals = 22, tasks = 192, submissions = 0]
    **********************
    
    • parallelism:当前ForkJoinPool设定的并行级别
    • size:当前ForkJoinPool线程池内部的所有线程数量,这些线程可能处于阻塞状态(使用join方法引起的阻塞或者任务中其它会引起线程阻塞方法引起的阻塞),可能处于运行状态。
    • active:当前线程池内部,正在进行compute计算的线程(这些线程不代表没有被阻塞)。
    • running:当前线程池内部,正在进行compute计算并且没有被任何阻塞线程阻塞机制所影响的线程数量
    • steals:当前ForkJoinPool线程池内部各个work queue间发生的“工作窃取”操作的总次数。
    • tasks:当前ForkJoinPool线程池内部各个work queue中等待处理的子任务总数量。
    • submissions:通过submit方式或者其它方式提交到ForkJoinPool中,准备进行归并计算的但是ForkJoinPool还没有开始处理的任务(ForkJoinTask任务或者其子任务)数量。

    相关文章

      网友评论

          本文标题:Fork/Join框架分析

          本文链接:https://www.haomeiwen.com/subject/sitocdtx.html