美文网首页一些收藏
Java并发编程——ForkJoinPool之WorkQueue

Java并发编程——ForkJoinPool之WorkQueue

作者: 小波同学 | 来源:发表于2022-09-17 03:08 被阅读0次

    一、ForkJoinPool

    ForkJoinPool 是 JDK7 引入的,由 Doug Lea 编写的高性能线程池。核心思想是将大的任务拆分成多个小任务(即fork),然后在将多个小任务处理汇总到一个结果上(即join),非常像MapReduce处理原理。同时,它提供基本的线程池功能,支持设置最大并发线程数,支持任务排队,支持线程池停止,支持线程池使用情况监控,也是AbstractExecutorService的子类,主要引入了“工作窃取”机制,在多CPU计算机上处理性能更佳。其广泛用在java8的stream中。

    从图中可以看出ForkJoinPool要先执行完子任务才能执行上一层任务,所以ForkJoinPool适合在有限的线程数下完成有父子关系的任务场景,比如:快速排序,二分查找,矩阵乘法,线性时间选择等场景,以及数组和集合的运算。

    Fork/Join Pool采用优良的设计、代码实现和硬件原子操作机制等多种思路保证其执行性能。其中包括(但不限于):计算资源共享、高性能队列、避免伪共享、工作窃取机制等。

    二、与ThreadPoolExecutor原生线程池的区别

    ForkJoinPool和ThreadPoolExecutor都实现了Executor和ExecutorService接口,都可以通过构造函数设置线程数,threadFactory,可以查看ForkJoinPool.makeCommonPool()方法的源码查看通用线程池的构造细节。

    在内部结构上我觉得两个线程池最大的区别是在工作队列的设计上,如下图

    ThreadPoolExecutor:

    ForkJoinPool:

    图上细节画的不严谨,但大致能看出区别:

    • ForkJoinPool每个线程都有自己的队列
    • ThreadPoolExecutor共用一个队列

    使用ForkJoinPool可以在有限的线程数下来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过2000万个任务。

    ForkJoinPool最适合计算密集型任务,而且最好是非阻塞任务,之前的一篇文章:Java踩坑记系列之线程池 也说了线程池的不同使用场景和注意事项。

    所以ForkJoinPool是ThreadPoolExecutor线程池的一种补充,是对计算密集型场景的加强。

    三、工作窃取的实现原理

    ForkJoinPool类中的WorkQueue正是实现工作窃取的队列,javadoc中的注释如下:

    大意是大多数操作都发生在工作窃取队列中(在嵌套类工作队列中)。这些是特殊形式的Deques,主要有push,pop,poll操作。

    Deque是双端队列(double ended queue缩写),头部和尾部任何一端都可以进行插入,删除,获取的操作,即支持FIFO(队列)也支持LIFO(栈)顺序。

    Deque接口的实现最常见的是LinkedList,除此还有ArrayDeque、ConcurrentLinkedDeque等。

    工作窃取模式主要分以下几个步骤:

    • 1、每个线程都有自己的双端队列。
    • 2、当调用fork方法时,将任务放进队列头部,线程以LIFO顺序,使用push/pop方式处理队列中的任务。
    • 3、如果自己队列里的任务处理完后,会从其他线程维护的队列尾部使用poll的方式窃取任务,以达到充分利用CPU资源的目的。
    • 4、从尾部窃取可以减少同原线程的竞争。
    • 5、当队列中剩最后一个任务时,通过cas解决原线程和窃取线程的竞争。

    流程大致如下所示:


    工作窃取便是ForkJoinPool线程池的优势所在,在一般的线程池比如ThreadPoolExecutor中,如果一个线程正在执行的任务由于某种原因无法继续运行,那么该线程会处于等待状态,包括singleThreadPool、fixedThreadPool、cachedThreadPool这几种线程池。

    而在ForkJoinPool中,那么线程会主动寻找其他尚未被执行的任务然后窃取过来执行,减少线程等待时间。

    JDK8中的并行流(parallelStream)功能是基于ForkJoinPool实现的,另外还有java.util.concurrent.CompletableFuture异步回调future,内部使用的线程池也是ForkJoinPool。

    四、ForkJoinPool分析

    4.1 ForkJoinPool成员变量

    // 用来配置ctl在控制线程数量使用
    private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
    
    //控制线程池数量(ctl & ADD_WORKER) != 0L 时创建线程,
    // 也就是当ctl的第16位不为0时,可以继续创建线程
    volatile long ctl;                   // main pool control
    
    //全局锁控制,全局运行状态
    volatile int runState;               // lockable status
    
    //config二进制形式的低16位表示parallelism,
    //config二进制形式的第高16位表示mode,1表示异步模式, 使用先进先出队列, 0表示同步模式, 使用先进后出栈
    //低16位表示workerQueue在pool中的索引,高16位表示mode, 有FIFI LIFL 
    final int config;  // parallelism, mode   
     
    //生成workerQueue索引的重要依据
    int indexSeed;         // to generate worker index  
    
    //工作者队列数组,内部线程ForkJoinWorkerThread启动时会注册一个WorkerQueue对象到这个数组中
    volatile WorkQueue[] workQueues;     // main registry 
    
    //工作者线程线程工厂,创建ForkJoinWorkerThread的策略
    final ForkJoinWorkerThreadFactory factory;  
    
    //在线程因未捕异常而退出时,java虚拟机将回调的异常处理策略
    final UncaughtExceptionHandler ueh;  // per-worker UEH 
    
    //工作者线程名的前缀
    final String workerNamePrefix;       // to create worker name string  
    
    //执行器所有线程窃取的任务总数,也作为监视runState的锁
    volatile AtomicLong stealCounter;    // also used as sync monitor
    
    //通用的执行器,它在静态块中初始化
    static final ForkJoinPool common; 
    

    五、WorkQueue

    5.1 类结构及其成员变量

    5.1.1 类结构和注释

    WorkQueue是ForkJoinPool的核心内部类,是一个Contented修饰的静态内部类。

    /**
     * Queues supporting work-stealing as well as external task
     * submission. See above for descriptions and algorithms.
     * Performance on most platforms is very sensitive to placement of
     * instances of both WorkQueues and their arrays -- we absolutely
     * do not want multiple WorkQueue instances or multiple queue
     * arrays sharing cache lines. The @Contended annotation alerts
     * JVMs to try to keep instances apart.
     */
    @sun.misc.Contended
    static final class WorkQueue {
    }
    

    其注释大意为:
    workQUeue是一个支持任务窃取和外部提交任务的队列,其实现参考ForkJoinPool描述的算法。在大多数平台上的性能对工作队列及其数组的实例都非常敏感。我们不希望多个工作队列的实例和多个队列数组共享缓存。@Contented注释用来提醒jvm将workQueue在执行的时候与其他对象进行区别。

    @Contented,实际上就是采用内存对齐的方式避免伪共享,保证WorkQueue在执行的时候,其前后不会有其他对象干扰。

    注:JVM 添加 -XX:-RestrictContended 参数后 @sun.misc.Contended 注解才有效)

    5.1.2 MAXIMUM_QUEUE_CAPACITY

    MAXIMUM_QUEUE_CAPACITY注释如下:

    /**
     * Maximum size for queue arrays. Must be a power of two less
     * than or equal to 1 << (31 - width of array entry) to ensure
     * lack of wraparound of index calculations, but defined to a
     * value a bit less than this to help users trap runaway
     * programs before saturating systems.
     */
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
    

    MAXIMUM_QUEUE_CAPACITY是队列支持的最大容量,必须是2的幂小于或等于1<<(31-数组项的宽度),但定义为一个略小于此值的值,以帮助用户在饱和系统之前捕获失控的程序。

    5.1.3 成员变量

    成员变量区如下:

    @sun.misc.Contended
    static final class WorkQueue {
    
        //队列的初始容量
        static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
    
        // 64M 队列的最大容量
        static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
    
        // Instance fields
        volatile int scanState;    // versioned, <0: inactive; odd:scanning
        int stackPred;             // pool stack (ctl) predecessor
        int nsteals;               // number of steals
        int hint;                  // randomization and stealer index hint
        int config;                // pool index and mode
        volatile int qlock;        // 1: locked, < 0: terminate; else 0
        volatile int base;         // index of next slot for poll
        int top;                   // index of next slot for push
        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
        final ForkJoinPool pool;   // the containing pool (may be null)
        final ForkJoinWorkerThread owner; // owning thread or null if shared
        volatile Thread parker;    // == owner during call to park; else null
        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
        volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
    }
    
    • scanState:它可以看作是乐观锁的版本号,另外它还有此其他功能,它为负数时,表示工作者线程非活动,它为奇数是表示,正在扫描(准备窃取)任务,它为偶数是表示正在执行任务。

    • stackPred:表示在线程池栈当前工作线程的前驱线程的索引。在唤醒线程时常用到此属性。

    • nsteals:表示owner线程窃取的任务数。

    • hint:任务窃取时的随机定位种子。

    • config:低16位表示,当前WorkerQueue对象在外部类的数组属性workQueues中的索引(下标) 。高16位表示当前WorkerQueue对象的模式。对于内部任务,若构造方法配置为异步模式就将WorkQueue当作先进先出的队列,反之将WorkQueue当作后进先出的栈。对于外部任务,将WorkQueue视为共享队列。

    • qlock:初始值为0,”=1“时表示当前WorkerQueue对象被锁住,” < 0“时 表示当前WorkerQueue对象已终止,队列中的其他未完成任务将不再被执行。

    • base:表示下次对任务数组array进行poll出队操作(窃取任务)的槽位索引(队尾)。

    • top:表示下次任务数组array进行push入栈操作(添加任务)的槽位索引(栈顶)。

    • array:非学重要的属性,这用是保存任务的数组(容器)。

    • pool:与之关联的ForkJoinPool执行器,它可能为空。若为空,就使用静态变量common作为执行器。

    • owner:当前队列对应的工作者线程,它一般不为空。若从外部提交任务时,当前WorkerQueue对象表示共享队列,owner为空。

    • parker:阻塞的线程。在被阻塞的时候,它等于owner,其他时候它为空。

    • currentJoin:表示当前正在join的任务,主要在awaitJoin方法使用。

    • currentSteal:表示当前被窃取的任务,主要在helpStealer方法中使用。

    5.2 构造函数

    WorkQueue就一个构造函数:

    WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
        this.pool = pool;
        this.owner = owner;
        // Place indices in the center of array (that is not yet allocated)
        base = top = INITIAL_QUEUE_CAPACITY >>> 1;
    }
    

    在这个构造函数中,只会指定pool和owoner,如果该队列是共享队列,那么owoner此时是空的。此外,base和top两个指针分别都指向了数组的中值,这个值是初始化容量右移一位。

    那么结合前面的代码,实际上初始化的时候,数组的长度为8192,那么base=top=4096。

    这个数组在构造函数被调用之后初始化如下:


    5.3 重要的方法

    5.3.1 push

    当ForkJoinWorkerThread需要向双端队列中放入一个新的待执行子任务时,会调用WorkQueue中的push方法。来看看这个方法的主要执行过程(请注意,源代码来自JDK1.8,它和JDK1.7中的实现有显著不同):

    /**
     * Pushes a task. Call only by owner in unshared queues.  (The
     * shared-queue version is embedded in method externalPush.)
     *
     * @param task the task. Caller must ensure non-null.
     * @throws RejectedExecutionException if array cannot be resized
     */
    final void push(ForkJoinTask<?> task) {
        ForkJoinTask<?>[] a; ForkJoinPool p;
        int b = base, s = top, n;
        // 请注意,在执行task.fork时,触发push情况下,array不会为null
        // 因为在这之前workqueue中的array已经完成了初始化(在工作线程初始化时就完成了)    
        if ((a = array) != null) {    // ignore if queue removed
            //m为最高为位置的index
            int m = a.length - 1;     // fenced write for task visibility
            // U常量是java底层的sun.misc.Unsafe操作类
            // 这个类提供硬件级别的原子操作
            // putOrderedObject方法在指定的对象a中,指定的内存偏移量的位置,赋予一个新的元素      
            U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
            // putOrderedInt方法对当前指定的对象中的指定字段,进行赋值操作
            // 这里的代码意义是将workQueue对象本身中的top标示的位置 + 1,    
            U.putOrderedInt(this, QTOP, s + 1);
            //如果n小于等于1则 且poll不为空 则触发worker窃取或者产生新的worker
            if ((n = s - b) <= 1) {
                if ((p = pool) != null)
                    // signalWork方法的意义在于,在当前活动的工作线程过少的情况下,创建新的工作线程
                    p.signalWork(p.workQueues, this);
            }
            //如果n大于等于了m 则说明需要扩容了, array的剩余空间不够了
            else if (n >= m)
                growArray();
        }
    }
    

    这个push方法是提供给工作队列自己push任务来使用的,共享队列push任务是在外部externalPush和externalSubmit等方法来进行初始化和push。

    这里需要注意的是,当队列中的任务数小于1的时候,才会调用signalWork,这个地方一开始并不理解,实际上,我们需要注意的是,这个方法是专门提供给工作队列来使用的,那么这个条件满足的时候,说明工作队列空闲。如果这个条件不满足,那么工作队列中有很多任务需要工作队列来处理,就不会触发对这个队列的窃取操作。

    5.3.2 growArray

    这是扩容的方法。实际上这个方法有两个作用,首先是初始化,其次是判断,是否需要扩容,如果需要扩容则容量加倍。

    /**
     * Initializes or doubles the capacity of array. Call either
     * by owner or with lock held -- it is OK for base, but not
     * top, to move while resizings are in progress.
     */
    final ForkJoinTask<?>[] growArray() {
        //旧的数组 oldA
        ForkJoinTask<?>[] oldA = array;
        //如果oldA不为空,则size就为oldA的长度*2,反之说明数组没有被初始化,那么长度就应该为初始化的长度8192
        int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
        //如果size比允许的最大容量还大,那么此时会抛出异常
        if (size > MAXIMUM_QUEUE_CAPACITY)
            throw new RejectedExecutionException("Queue capacity exceeded");
        int oldMask, t, b;
        //array a 为根据size new出来的一个新的数组
        ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
        //如果oldA不为空且其长度大于等于0为有效数组,且top-base大于0 说明不为空
        if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
            (t = top) - (b = base) > 0) {
            //按size定义掩码
            int mask = size - 1;
            //从旧的数组中poll全部task,然后push到新的array中
            do { // emulate poll from old array, push to new array
                ForkJoinTask<?> x;
                //采用unsafe操作
                int oldj = ((b & oldMask) << ASHIFT) + ABASE;
                int j    = ((b &    mask) << ASHIFT) + ABASE;
                //实际上直接进行的内存对象copy,这样效率比循环调用push和poll要高很多
                x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
                //判断  x不为空 则使用unsafe进行操作
                if (x != null &&
                    U.compareAndSwapObject(oldA, oldj, x, null))
                    U.putObjectVolatile(a, j, x);
            } while (++b != t);
        }
        //返回新的数组
        return a;
    }
    

    需要注意的是,这个方法一旦调用进行扩容之后,无论是来自于外部push操作触发,还是有工作线程worker触发,都将被锁定,之后,不能移动top指针,但是base指针是可以移动的。这也就是说,一旦处于扩容的过程中,就不能新增task,但是可以从base进行消费,这就只支持FIFO。因此同步模式将在此时被阻塞。

    5.3.3 pop

    同样,pop操作也仅限于工作线程,对于共享对立中则不允许使用pop方法。这个方法将按LIFO后进先出的方式从队列中。

    /**
     * Takes next task, if one exists, in LIFO order.  Call only
     * by owner in unshared queues.
     */
    final ForkJoinTask<?> pop() {
        ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
        //如果array不为空切长度大于0
        if ((a = array) != null && (m = a.length - 1) >= 0) {
            //循环,s为top的指针减1,即top减1之后要大于0 也就是说要存在task
            for (int s; (s = top - 1) - base >= 0;) {
                //计算unsafe的偏移量 得到s的位置
                long j = ((m & s) << ASHIFT) + ABASE;
                //如果这个索引处的对象为空,则退出
                if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                    break;
                //反之用usafe的方法将这个值取走,之后返回,并更新top的指针
                if (U.compareAndSwapObject(a, j, t, null)) {
                    U.putOrderedInt(this, QTOP, s);
                    return t;
                }
            }
        }
        return null;
    }
    

    pop方法,这是仅限于owoner调用的方法,将从top指针处取出task。这个方法对于整个队列是LIFO的方式。

    5.3.4 poll

    poll方法将从队列中按FIFO的方式取出task。

    /**
     * Takes next task, if one exists, in FIFO order.
     */
    final ForkJoinTask<?> poll() {
        ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
        //判断 base-top小于0说明存在task 切array不为空
        while ((b = base) - top < 0 && (a = array) != null) {
            //计算出unsafe操作的索引 实际上就是拿到b
            int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
            //之后拿到这个task 用volatile的方式
            t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
            //之后如果base和b相等
            if (base == b) {
               //如果拿到的task不为空
                if (t != null) {
                    //那么将这个位置的元素移除 base+1 然后返回t
                    if (U.compareAndSwapObject(a, j, t, null)) {
                        base = b + 1;
                        return t;
                    }
                }
                //在上述操作之后,如果base比top小1说明已经为空了 直接退出循环
                else if (b + 1 == top) // now empty
                    break;
            }
        }
        //默认返回null
        return null;
    }
    

    5.3.5 pollAt

    这个方法将采用FIFO的方式,从 队列中获得task。

    /**
     * Takes a task in FIFO order if b is base of queue and a task
     * can be claimed without contention. Specialized versions
     * appear in ForkJoinPool methods scan and helpStealer.
     */
    final ForkJoinTask<?> pollAt(int b) {
        ForkJoinTask<?> t; ForkJoinTask<?>[] a;
        //数组不为空
        if ((a = array) != null) {
            //计算索引b的位置
            int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
            //如果此处的task不为空,则将此处置为null然后将对象task返回
            if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
                base == b && U.compareAndSwapObject(a, j, t, null)) {
                base = b + 1;
                return t;
            }
        }
        return null;
    }
    

    通常情况下,b指的是队列的base指针。那么从底部获取元素就能实现FIFO。特殊的版本出现在scan和helpStealer中用于对工作队列的窃取操作的实现。

    5.3.6 nextLocalTask

    /**
     * Takes next task, if one exists, in order specified by mode.
     */
    final ForkJoinTask<?> nextLocalTask() {
        return (config & FIFO_QUEUE) == 0 ? pop() : poll();
    }
    

    这个方法中对之前的MODE会起作用,如果是FIFO则用pop方法,反之则用poll方法获得下一个task。

    5.3.7 peek

    /**
     * Returns next task, if one exists, in order specified by mode.
     */
    final ForkJoinTask<?> peek() {
        ForkJoinTask<?>[] a = array; int m;
        //判断数组的合法性
        if (a == null || (m = a.length - 1) < 0)
            return null;
        //根据mode决定从top还是base处获得task
        int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base;
        int j = ((i & m) << ASHIFT) + ABASE;
        //返回获得的task
        return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
    }
    

    peek则根据之前的mode定义,从队列的前面或者后面取得task。

    5.3.8 tryUnpush

    /**
     * Pops the given task only if it is at the current top.
     * (A shared version is available only via FJP.tryExternalUnpush)
    */
    final boolean tryUnpush(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] a; int s;
        //判断数组的合法性
        if ((a = array) != null && (s = top) != base &&
            //将top位置的task与t比较,如果相等则将其改为null
            U.compareAndSwapObject
            (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
            //将top减1
            U.putOrderedInt(this, QTOP, s);
            //返回操作成功
            return true;
        }
        //默认返回失败
        return false;
    }
    

    这个方法是将之前push的任务撤回。这个操作仅仅只有task位于top的时候操能成功。

    5.3.9 runTask

    在之前的文章分析外部提交task的时候,就提到了这个方法。实际上是runWorker调用的。
    也就是说,线程在启动之后,一旦worker获取到task,就会运行。

    /**
     * Executes the given task and any remaining local tasks.
     */
    final void runTask(ForkJoinTask<?> task) {
        //task不为空
        if (task != null) {
            //扫描状态标记为busy 那么说明当前的worker正在处理本地任务   此时这个操作会将scanState改为0
            scanState &= ~SCANNING; // mark as busy
            //执行这个task
            (currentSteal = task).doExec();
            //释放已执行任务的内存
            U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
            //执行其他本地的task
            execLocalTasks();
            ForkJoinWorkerThread thread = owner;
            //增加增加steals的次数
            if (++nsteals < 0)      // collect on overflow
                transferStealCount(pool);
            //将scanState改为1 这样就变得活跃可以被其他worker scan
            scanState |= SCANNING;
            //如果thread不为null说明为worker线程 则调用后续的exec方法
            if (thread != null)
                thread.afterTopLevelExec();
        }
    }
    

    5.3.10 execLocalTasks

    调用这个方法,运行队列中的全部task,如果采用了LIFO模式,则调用pollAndExecAll,这是另外一种实现方法。直到将队列都执行到empty

    /**
     * Removes and executes all local tasks. If LIFO, invokes
     * pollAndExecAll. Otherwise implements a specialized pop loop
     * to exec until empty.
     */
    final void execLocalTasks() {
        int b = base, m, s;
        //拿到数组
        ForkJoinTask<?>[] a = array;
        //如果b-s小于0说明存在task,a不为空,切a的长度大于0 这均是检测方法的合法性
        if (b - (s = top - 1) <= 0 && a != null &&
            (m = a.length - 1) >= 0) {
            //如果没有采用FIFO的mode  那么一定是LIFO 则从top处开始
            if ((config & FIFO_QUEUE) == 0) {
               //开始循环
                for (ForkJoinTask<?> t;;) {
                   //从top开始取出task
                    if ((t = (ForkJoinTask<?>)U.getAndSetObject
                         (a, ((m & s) << ASHIFT) + ABASE, null)) == null)
                        break;
                    //修改top
                    U.putOrderedInt(this, QTOP, s);
                    //执行task
                    t.doExec();
                    //如果没有任务的了 则退出
                    if (base - (s = top - 1) > 0)
                        break;
                }
            }
            else
               //FIFO的方式调用pollAndExecAll
                pollAndExecAll();
        }
    }
    

    5.3.11 pollAndExecAll

    此方法将用poll,FIFO的方式获得task并执行。

    final void pollAndExecAll() {
        for (ForkJoinTask<?> t; (t = poll()) != null;)
            t.doExec();
    }
    

    可见,当通过workQueue中调用runTask的方法的时候,会将这个队列的scanState状态修改为0,之后将这个队列中的全部task根据定义的mode全部消费完毕。

    5.3.12 tryRemoveAndExec

    从注释中可知,这个方法仅仅供awaitJoin方法调用,在await的过程中,将task从workQueue中移除并执行。

    /**
     * If present, removes from queue and executes the given task,
     * or any other cancelled task. Used only by awaitJoin.
     *
     * @return true if queue empty and task not known to be done
     */
    final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
        ForkJoinTask<?>[] a; int m, s, b, n;
        //判断数组的合法性 task不能为空
        if ((a = array) != null && (m = a.length - 1) >= 0 &&
            task != null) {
            //循环  n为task的数量,必须大于0
            while ((n = (s = top) - (b = base)) > 0) {
                //死循环 从top遍历到base
                for (ForkJoinTask<?> t;;) {      // traverse from s to b
                    long j = ((--s & m) << ASHIFT) + ABASE;
                    if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                        return s + 1 == top;     // shorter than expected
                    //如果task处于top位置
                    else if (t == task) {
                        boolean removed = false;
                        if (s + 1 == top) {      // pop
                            //pop的方式获取task  然后替换为null
                            if (U.compareAndSwapObject(a, j, task, null)) {
                                U.putOrderedInt(this, QTOP, s);
                                removed = true;
                            }
                        }
                        //用emptytask代替
                        else if (base == b)      // replace with proxy
                            removed = U.compareAndSwapObject(
                                a, j, task, new EmptyTask());
                        //如果remove成功 则执行这个task
                        if (removed)
                            task.doExec();
                        break;
                    }
                    //如果task的status为负数 切 top=s=1
                    else if (t.status < 0 && s + 1 == top) {
                        //移除
                        if (U.compareAndSwapObject(a, j, t, null))
                            U.putOrderedInt(this, QTOP, s);
                        break;                  // was cancelled
                    }
                    if (--n == 0)
                        return false;
                }
                if (task.status < 0)
                    return false;
            }
        }
        return true;
    }
    

    5.3.13 popCC

    如果pop CountedCompleter。这方法支持共享和worker的队列,但是仅仅通过helpComplete调用。
    CountedCompleter是jdk1.8中新增的一个ForkJoinTask的一个实现类。

    /**
     * Pops task if in the same CC computation as the given task,
     * in either shared or owned mode. Used only by helpComplete.
     */
    final CountedCompleter<?> popCC(CountedCompleter<?> task, int mode) {
        int s; ForkJoinTask<?>[] a; Object o;
        //判断队列数组合法性
        if (base - (s = top) < 0 && (a = array) != null) {
            //从top处开始
            long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
            //如果获的的task不为null
            if ((o = U.getObjectVolatile(a, j)) != null &&
                //且为CountedCompleter对象
                (o instanceof CountedCompleter)) {
                //转换为CountedCompleter
                CountedCompleter<?> t = (CountedCompleter<?>)o;
                //死循环
                for (CountedCompleter<?> r = t;;) {
                    //如果task与获得的r相等为同一对象
                    if (r == task) {
                        //如果mode小于0 
                        if (mode < 0) { // must lock
                             //cas的方式加锁
                            if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
                                //将这个对象清除 并修改top后解锁
                                if (top == s && array == a &&
                                    U.compareAndSwapObject(a, j, t, null)) {
                                    U.putOrderedInt(this, QTOP, s - 1);
                                    U.putOrderedInt(this, QLOCK, 0);
                                    //返回t
                                    return t;
                                }
                                //解锁
                                U.compareAndSwapInt(this, QLOCK, 1, 0);
                            }
                        }
                        else if (U.compareAndSwapObject(a, j, t, null)) {
                            U.putOrderedInt(this, QTOP, s - 1);
                            return t;
                        }
                        break;
                    }
                    else if ((r = r.completer) == null) // try parent
                        break;
                }
            }
        }
        return null;
    }
    

    5.3.14 pollAndExecCC

    pollAndExecCC 。窃取并运行与给定任务相同CountedCompleter计算任务(如果存在),并且可以在不发生争用的情况下执行该任务。否则,返回一个校验和/控制值,供helpComplete方法使用。

    /**
     * Steals and runs a task in the same CC computation as the
     * given task if one exists and can be taken without
     * contention. Otherwise returns a checksum/control value for
     * use by method helpComplete.
     *
     * @return 1 if successful, 2 if retryable (lost to another
     * stealer), -1 if non-empty but no matching task found, else
     * the base index, forced negative.
     */
    final int pollAndExecCC(CountedCompleter<?> task) {
        int b, h; ForkJoinTask<?>[] a; Object o;
        //判断array的合法性
        if ((b = base) - top >= 0 || (a = array) == null)
            h = b | Integer.MIN_VALUE;  // to sense movement on re-poll
        else {
            //从base开始获得task
            long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
            if ((o = U.getObjectVolatile(a, j)) == null)
                h = 2;                  // retryable
            else if (!(o instanceof CountedCompleter))
                h = -1;                 // unmatchable
            else {
                CountedCompleter<?> t = (CountedCompleter<?>)o;
                //死循环
                for (CountedCompleter<?> r = t;;) {
                    if (r == task) {
                        if (base == b &&
                            U.compareAndSwapObject(a, j, t, null)) {
                            base = b + 1;
                            t.doExec();
                            h = 1;      // success
                        }
                        else
                            h = 2;      // lost CAS
                        break;
                    }
                    else if ((r = r.completer) == null) {
                        h = -1;         // unmatched
                        break;
                    }
                }
            }
        }
        return h;
    }
    

    externalPush方法中的“q = ws[m & r & SQMASK]”代码非常重要。我们大致来分析一下作者的意图,首先m是ForkJoinPool中的WorkQueue数组长度减1,例如当前WorkQueue数组大小为16,那么m的值就为15;r是一个线程独立的随机数生成器,关于java.util.concurrent.ThreadLocalRandom类的功能和使用方式可参见其它资料;而SQMASK是一个常量,值为126 (0x7e)。以下是一种可能的计算过程和计算结果:


    实际上任何数和126进行“与”运算,其结果只可能是0或者偶数,即0、2、4、6、8。也就是说以上代码中从名为“ws”的WorkQueue数组中,取出的元素只可能是第0个或者第偶数个队列。

    结论就是偶数是外部任务,奇数是需要拆解合并的任务。

    ForkJoinWorkerThread需要从双端队列中取出下一个待执行子任务,就会根据设定的asyncMode调用双端队列的不同方法,代码概要如下所示:

    final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
        for (ForkJoinTask<?> t;;) {
            WorkQueue q; int b;
            // 该方法试图从“w”这个队列获取下一个待处理子任务
            if ((t = w.nextLocalTask()) != null)
                return t;
            // 如果没有获取到,则使用findNonEmptyStealQueue方法
            // 随机得到一个元素非空,并且可以进行任务窃取的存在于ForkJoinPool中的其它队列
            // 这个队列被记为“q”   
            if ((q = findNonEmptyStealQueue()) == null)
                return null;
            // 试图从“q”这个队列base位处取出待执行任务  
            if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
                return t;
        }
    }
    

    六、总结

    本文对workQueue的源码进行了分析,我们需要注意的是,对于workQueue,定义了三个操作,分别是push,poll和pop。

    • push

    主要是操作top指针,将top进行移动。

    • poll
      如果top和base不等,则说明队列有值,可以消费,那么poll就从base指针处开始消费。这个方法实现了队列的FIFO。

    消费之后对base进行移动。

    • pop
      同样,还可以从top开始消费,这就是pop。这个方法实际上实现了对队列的LIFO。

    消费之后将top减1。

    以上就是这三个方法对应的操作。但是我们还需要注意的是,在所有的unsafe操作中,通过cas进行设置或者获得task的时候,还有一个掩码。这个非常重要。
    我们可以看在push方法中:

     int m = a.length - 1;
     U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
    

    在扩容的方法growArray中我们可以知道。每次扩容都是采用左移的方式来进行,这样就保证了数组的长度为2的幂。

    在这里,m=a.length-1,那就说明,m实际上其二进制格式将会有效位都为1,这个数字就可以做为掩码。当m再与s取&计算的时候。可以想象,s大于m的部分将被去除,只会保留比m小的部分。那么实际上,这就等价于,当我们一直再push元素到数组中的时候,实际上就从数组的索引底部开始:


    参考上面这个过程,也就是说,实际上这个数组,base和top实际指向的index并不重要。只有二者的相对位移才是重要的。这有点类似与RingBuffer的数据结构,但是还是有所不同。也就是说这个数组实际上是不会被浪费的。之前有很多不理解的地方,为什么top减去base可能出现负数。那么这样实际上就会导致负数的产生。

    这样的话,如果我们采用异步模式,asyncMode为true的时候,workQueue则会采用FIFO_QUEUE的model,这样workQueue本身就使用的时poll方法。反之如果使用LIFO_QUEUE的同步模式,则workQueue使用pop方法。默认情况下采用同步模式。同步的时候workQueue的指针都围绕在数组的初始化的中间位置波动。而共享队列则会一直循环。

    至此,我们分析了workQueue的源码,对其内部实现的双端队列本身的操作进行了分析。为什么作者会自己实现一个Deque,而不是使用juc中已存在的容器。这就是因为这个队列全程都是采用Unsafe来实现的,在开篇作者也说了,需要@Contented修饰,就是为了避免缓存的伪代共享。这样来实现一个高效的Deque,以供ForkJoinPool来操作。
    这与学习ConcurrentHashMap等容器的源码一样,可以看出作者为了性能的优化,采用了很多独特的方式来实现。这些地方都是我们值得学习和借鉴之处。这也是ForkJoin性能高效的关键。在作者的论文中也可以看出,java的实现,由于抽象在jvm之上,性能比c/c++的实现要低很多。这也是作者尽可能将性能做到最优的原因之一。

    参考:
    https://blog.csdn.net/Xiaowu_First/article/details/122407019

    https://blog.csdn.net/tyrroo/article/details/81483608

    https://www.cnblogs.com/juniorMa/articles/14234472.html

    https://www.cnblogs.com/maoyx/p/13991828.html

    https://blog.csdn.net/dhaibo1986/article/details/108801254

    相关文章

      网友评论

        本文标题:Java并发编程——ForkJoinPool之WorkQueue

        本文链接:https://www.haomeiwen.com/subject/yfpsortx.html