ForkjoinPool -1

作者: 程序员札记 | 来源:发表于2022-03-15 09:49 被阅读0次

    ForkJoin是用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。


    image.png

    下面是一个是一个简单的Join/Fork计算过程,将1—1001数字相加

    package com.ebay.concurrent.threadpool;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * 这是一个简单的Join/Fork计算过程,将1—1001数字相加
     */
    public class TestForkJoinPool {
    
        private static final Integer MAX = 200;
    
        static class MyForkJoinTask extends RecursiveTask<Integer> {
            // 子任务开始计算的值
            private Integer startValue;
    
            // 子任务结束计算的值
            private Integer endValue;
    
            public MyForkJoinTask(Integer startValue , Integer endValue) {
                this.startValue = startValue;
                this.endValue = endValue;
            }
    
            @Override
            protected Integer compute() {
                // 如果条件成立,说明这个任务所需要计算的数值分为足够小了
                // 可以正式进行累加计算了
                if(endValue - startValue < MAX) {
                    System.out.println("开始计算的部分:startValue = " + startValue + ";endValue = " + endValue);
                    Integer totalValue = 0;
                    for(int index = this.startValue ; index <= this.endValue  ; index++) {
                        totalValue += index;
                    }
                    return totalValue;
                }
                // 否则再进行任务拆分,拆分成两个任务
                else {
                    MyForkJoinTask subTask1 = new MyForkJoinTask(startValue, (startValue + endValue) / 2);
                    subTask1.fork();
                    MyForkJoinTask subTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1 , endValue);
                    subTask2.fork();
                    return subTask1.join() + subTask2.join();
                }
            }
        }
    
        public static void main(String[] args) {
            // 这是Fork/Join框架的线程池
            ForkJoinPool pool = new ForkJoinPool();
            ForkJoinTask<Integer> taskFuture =  pool.submit(new MyForkJoinTask(1,1001));
            try {
                Integer result = taskFuture.get();
                System.out.println("result = " + result);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace(System.out);
            }
        }
    }
    

    通常这样个模型,你们会想到什么?

    Release Framework ? 常见的处理模型是什么? task pool - worker pool的模型。 但是Forkjoinpool 采取了完全不同的模型。
    ForkJoinPool一种ExecutorService的实现,运行ForkJoinTask任务。ForkJoinPool区别于其它ExecutorService,主要是因为它采用了一种工作窃取(work-stealing)的机制。所有被ForkJoinPool管理的线程尝试窃取提交到池子里的任务来执行,执行中又可产生子任务提交到池子中。
    ForkJoinPool维护了一个WorkQueue的数组(数组长度是2的整数次方,自动增长)。每个workQueue都有任务队列(ForkJoinTask的数组),并且用base、top指向任务队列队尾和队头。work-stealing机制就是工作线程挨个扫描任务队列,如果队列不为空则取队尾的任务并执行。示意图如下


    image.png
    image.png
    image.png

    流程图:


    image.png

    源码解析

    pool属性

       // Instance fields
        volatile long ctl;                   // 控制中心:非常重要,看下图解析
        volatile int runState;               // 负数是shutdown,其余都是2的次方
        final int config;                    // 配置:二进制的低16位代表 并行度(parallelism),//高16位:mode可选FIFO_QUEUE(1 << 16)和LIFO_QUEUE(1 << 31),默认是LIFO_QUEUE
        int indexSeed;                       // 生成worker的queue索引
        volatile WorkQueue[] workQueues;     // main registry
        final ForkJoinWorkerThreadFactory factory;
        final UncaughtExceptionHandler ueh;  // per-worker UEH
        final String workerNamePrefix;       // to create worker name string
        volatile AtomicLong stealCounter;    // also used as sync monitor
    
    • config 是创建ForkJoinPool的配置,int类型32bits,
      a. 高16位表示pool的mode(FIFO或LIFO)
      b. 低16位表示parallelism(并行度,默认大小可用处理器数java.lang.Runtime#availableProcessors);mode有2个值:LIFO_QUEUE = 0,即高16位为0;以及FIFO_QUEUE = 1 << 16,即高16位为1。mode主要用于控制用什么方法来获取任务,如果是先进先出,则用poll方法获取任务,如果是后进先出则用pop获取任务。源码示例:(config & FIFO_QUEUE) == 0 ? pop() : poll();当你new ForkJoinPool时,可以指定你要的并发度(parallelism),这个并发度将存储在config的低16位中
    • ctl 是ForkJoinPool的主要控制字段,long类型64bits,ctl不同的bit位表示不同的含义;
      a. 高16位(63~48)表示活跃的线程,值为活跃线程数减去parallelism(补码表示),初始值是0-parallelism,工作线程激活则加1,去激活则减1。当累积加了parallelism时第63bit位翻转为0,则不允许再激活工作线程。
      b. 第47~32位表示当前所有工作线程(包括未激活的),值为所有工作线程数-parallelism(补码表示),创建线程则加1,终止线程则减1。当累积加了parallelism时第47位翻转位0,则不允许再创建线程;
      c. 第31~16位表示非激活线程链中top线程的版本计数和线程状态,与第15~0位合起来看;
      d. 第15~0位表示非激活线程链中top线程的本地WorkQueue在ForkJoinPool.workQueues数组中下标索引,第31~0位合起来的值实际是非激活线程链中top线程的本地WorkQueue.scanState


      image.png
    • workQueues 是ForkJoinPool维护一个WorkQueue数组,奇数下标的WorkQueue关联一个worker线程,偶数下标的WorkQueue用来接收外部提交的任务(非worker线程提交的任务);
      factory 创建worker线程的工厂;
    @sun.misc.Contended
    public class ForkJoinPool extends AbstractExecutorService {
    ...
    
        // Lower and upper word masks
        private static final long SP_MASK    = 0xffffffffL;
        private static final long UC_MASK    = ~SP_MASK;
    
        // Active counts
        private static final int  AC_SHIFT   = 48;
        private static final long AC_UNIT    = 0x0001L << AC_SHIFT;
        private static final long AC_MASK    = 0xffffL << AC_SHIFT;
    
        // Total counts
        private static final int  TC_SHIFT   = 32;
        private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
        private static final long TC_MASK    = 0xffffL << TC_SHIFT;
        private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
    
        // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
        private static final int  RSLOCK     = 1;
        private static final int  RSIGNAL    = 1 << 1;
        private static final int  STARTED    = 1 << 2;
        private static final int  STOP       = 1 << 29;
        private static final int  TERMINATED = 1 << 30;
        private static final int  SHUTDOWN   = 1 << 31;
    
        // Instance fields
        volatile long ctl;                   // main pool control
        volatile int runState;               // lockable status
        final int config;                    // parallelism, mode
        int indexSeed;                       // to generate worker index
        volatile WorkQueue[] workQueues;     // main registry
        final ForkJoinWorkerThreadFactory factory;
        final UncaughtExceptionHandler ueh;  // per-worker UEH
        final String workerNamePrefix;       // to create worker name string
        volatile AtomicLong stealCounter;    // also used as sync monitor
    ...
    }
    

    WorkQueue

    workQueues是pool的属性,它是WorkQueue类型的数组。externalPush和externalSubmit所创建的workQueue没有owner(即不是worker),且会被放到workQueues的偶数位置;而createWorker创建的workQueue(即worker)有owner,且会被放到workQueues的奇数位置。

        @sun.misc.Contended
        static final class WorkQueue {
    
            /**
             * Capacity of work-stealing queue array upon initialization.
             * Must be a power of two; at least 4, but should be larger to
             * reduce or eliminate cacheline sharing among queues.
             * Currently, it is much larger, as a partial workaround for
             * the fact that JVMs often place arrays in locations that
             * share GC bookkeeping (especially cardmarks) such that
             * per-write accesses encounter serious memory contention.
             */
            static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
    
            /**
             * Maximum size for queue arrays. Must be a power of two less
             * than or equal to 1 << (31 - width of array entry) to ensure
             * lack of wraparound of index calculations, but defined to a
             * value a bit less than this to help users trap runaway
             * programs before saturating systems.
             */
            static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
    
            // Instance fields
            volatile int scanState;    // versioned, <0: inactive; odd:scanning
            int stackPred;             // pool stack (ctl) predecessor
            int nsteals;               // number of steals
            int hint;                  // randomization and stealer index hint
            int config;                // pool index and mode
            volatile int qlock;        // 1: locked, < 0: terminate; else 0
            volatile int base;         // index of next slot for poll
            int top;                   // index of next slot for push
            ForkJoinTask<?>[] array;   // the elements (initially unallocated)
            final ForkJoinPool pool;   // the containing pool (may be null)
            final ForkJoinWorkerThread owner; // owning thread or null if shared
            volatile Thread parker;    // == owner during call to park; else null
            volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
            volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
    
    

    WorkQueue的几个重要成员变量说明如下:

    • scanState int类型32bits,各bit位含义如下:
      第31位表示线程状态(1非激活),第30~16位表示版本计数;
      第0位表示worker线程是否在运行任务(1-scanning,0-busy),这里有个小技巧,在创建worker线程的WorkQueue时scanState的第15~0位初始化为ForkJoinPool.workQueues的下标(worker线程的WorkQueue的下标是奇数),当worker线程运行任务时第0位设置0(busy),任务运行结束第0位又设置1(恢复为奇数),所以scanState的第15~0又可以表示在ForkJoinPool.workQueues数组的下标索引;
    • stackPred 当worker线程从激活变为非激活时设置值,且值为ForkJoinPool的ctl的低32位(实际是前一个非激活线程),这样就形成了一个非激活线程链;
    • config 高16位是mode(FIFO或者LIFO),低16位是ForkJoinPool.workQueues的下标;
    • base 任务队列的队尾,工作窃取就是窃取base指向的任务;
    • top 任务队列的队头(指向空),下一个push的位置;
    • array 任务队列;
    • pool 所属的ForkJoinPool实例;
    • owner 所属的worker线程,如果在ForkJoinPool.workQueues数组中下标是奇数,则不为空

    workQueue的config属性

    这是WorkQueue的config,高16位跟pool的config值保持一致,而低16位则是workQueue在workQueues数组的位置。
    从workQueues属性的介绍中,我们知道,不是所有workQueue都有worker,没有worker的workQueue称为公共队列(shared queue),config的第32位就是用来判断是否是公共队列的。在externalSubmit创建工作队列时,有:
    q.config = k | SHARED_QUEUE;
    其中q是新创建的workQueue,k就是q在workQueues数组中的位置,SHARED_QUEUE=1<<31,注意这里config没有保留mode的信息。
    而在registerWorker中,则是这样给workQueue的config赋值的:
    w.config = i | mode;
    w是新创建的workQueue,i是其在workQueues数组中的位置,没有设置SHARED_QUEUE标记位

    scanstate属性

    scanState是workQueue的属性,是int类型的。scanState的低16位可以用来定位当前worker处于workQueues数组的哪个位置。每个worker在被创建时会在其构造函数中调用pool的registerWorker,而registerWorker会给scanState赋一个初始值,这个值是奇数,因为worker是由createWorker创建,并会被放到WorkQueues的奇数位置,而createWorker创建worker时会调用registerWorker。
    简言之,worker的scanState初始值是奇数,非worker的scanstate初始值=INACTIVE=1<<31,小于0(非worker的workQueue在externalSubmit中创建)。
    当每次调用signalWork(或tryRelease)唤醒worker时,worker的高16位就会加1
    另外,scanState<0表示worker未激活,当worker调用runtask执行任务时,scanState会被置为偶数,即设置scanState的最右边一位为0。

    ctl,stackPred,与scanState实现worker休眠栈

    worker休眠时,是这样存储的

    int ctlHigh32=ctl >>>32;
    int ctlLow32=(int)ctl;
    ctl=ctlHigh32+worker.scanState
    worker.preStack=ctlLow32
    

    worker的唤醒类似这样:

    for(worker : pool.workQueues){
            if(worker.scanState==(int)ctl){
                    唤醒worker
                    worker.scanState的高16位加1
                    ctl的低32位=worker.preStack
                    退出循环
    }
    }
    

    在worker休眠的4行伪码中,让ctl的低32位的值变为worker.scanState,这样下次就可以通过scanState唤醒该worker。唤醒该worker时,把该worker的preStack设置为ctl低32位的值,这样下下次唤醒的worker就是scanState等于该preStack的worker。
    这里通过preStack保存下一个worker,这个worker比当前worker更早地在等待,所以形成一个后进先出的栈。

    runState是int类型的值,控制整个pool的运行状态和生命周期,有下面几个值(可以好几个值同时存在):

    RSLOCK  = 1;
    RSIGNAL = 1 << 1;
    STARTED = 1 << 2;
    STOP      = 1 << 29;
    TERMINATED = 1 << 30;
    SHUTDOWN   = 1 << 31;
    

    如果runState值为0,表示pool尚未初始化。
    RSLOCK表示锁定pool,当添加worker和pool终止时,就要使用RSLOCK锁定整个pool。如果由于runState被锁定,导致其他操作等待runState解锁(通常用wait进行等待),当runState设置了RSIGNAL,表示runState解锁,并通知(notifyAll)等待的操作。
    剩下4个值都跟runState生命周期有关,都可以顾名思义:
    当需要停止时,设置runState的STOP值,表示准备关闭,这样其他操作看到这个标记位,就不会继续操作,比如tryAddWorker看到STOP就不会再创建worker:

    if(!stop){
            createWorker()
    }
    

    而tryTerminate对这些生命周期状态的处理则是这样的:

    • 首先设置runState的SHUTDOWN,这样isShutdown等方法可以使用这个状态。然后判断查看是否设置了stop,如果否,则会通过信息worker等方式加快任务的执行,让任务尽快执行完毕,如果是则不会这样。之后开始终止pool,最后设置runState为TERMINATED。
    • 要修改runState值,需要先调用lockRunstate,锁定runstate。lockRunstate是线程安全的,如果锁定失败,线程会调用wait等待。如果锁定成功,则使用unLockRunstate(oldRunstate,newRunstate),修改runstate值。unLockRunstate执行成功会调用notify唤醒那些在lockRunstate中等待的线程。

    当前top和base的初始值为 INITIAL_QUEUE_CAPACITY >>>1= (1 << 13)>>>1 = 8192/2。然后push一个task之后,top+=1,也就是说,top对应的位置是没有task的,最近push进来的task在top-1的位置。而base的位置则能对应到task,base对应最先放进队列的task,top-1对应最后放进队列的task。

    workQueue的qlock

    qlock值含义:1: locked, < 0: terminate; else 0
    即当qlock值位0时,可以正常操作,值=1时,表示锁定

    相关算法解释

    求偶算法

    int SQMASK=0x007e,则任何整数跟SQMASK位与后,得到的数就是偶数。
    证明:
    注意这里化为二进制是0111 1110,尤其注意最右边第一位是0,任何数跟最右边第一位是0的数位与后,得到的数就是偶数,因为位与之后,第一位就是0,比如s=A&SQMASK,A可以是任意整数,然后把s按二进制进行多项式展开,则有s=2n1+2n2 ……+2^nn,这里n≥1,所以s可以被2整除,即s是偶数。
    所以一个数是奇数还是偶数,看其最右边第一位即可。

    workQueue的hint属性与“奇数自加散列”算法

    我们知道workQueue有externalPush创建的和createWorker创建的worker,两种方式创建的workQueue,其放置到workQueues的位置是不同的,前者放到workQueue的偶数位置,而后者则放到奇数位置。不同workQueue找到自己在workQueues的位置的算法有点不同。
    下面看一下forkjoin框架获取workQueues中的偶数位置的workQueue的算法:

    int r=ThreadLocalRandom.getProbe()
    int m=workQueues.length-1,这里workQueues.length是2的指数幂
    int SQMASK=0x007e
    workQueue=workQueues[m & r & SQMASK]
    

    这样就能获取workQueues的偶数位置的workQueue。m保证m & r & SQMASK这整个运算结果不会超出workQueues的下标,SQMASK保证取到的是偶数位置的workQueue。这里有一个有趣的现象,假设0到workQueues.length-1之间有n个偶数,m & r & SQMASK每次都能取到其中一个偶数,而且连续n次取到的偶数不会出现重复值,散列性非常好。而且是循环的,即1到n次取n个不同偶数,n+1到2n也是取n次不同偶数,此时n个偶数每个都被重新取一次。下面分析下r值有什么秘密,为何能保证这样的散列性
    ThreadLocalRandom内有一常量PROBE_INCREMENT = 0x9e3779b9,以及一个静态的probeGenerator =new AtomicInteger() ,然后每个线程的probe= probeGenerator.addAndGet(PROBE_INCREMENT)所以第一个线程的probe值是0x9e3779b9,第二个线程的值就是0x9e3779b9+0x9e3779b9,第三个线程的值就是0x9e3779b9+0x9e3779b9+0x9e3779b9以此类推,整个值是线性的,可以用y=kx表示,其中k=0x9e3779b9,x表示第几个线程。这样每个线程的probe可以保证不一样,而且具有很好的离散性。
    实际上,可以不用0x9e3779b9这个值,用任意一个奇数都是可以的,比如1。如果用1的话,probe+=1,这样每个线程的probe就都是不同的,而且具有很好的离散性。也就是说,假设有限制条件probe<n,超过n则产生溢出。则probe自加n次后才会开始出现重复值,n次前probe每次自加的值都不同。实际上用任意一个奇数,都可以保证probe自加n次后才会开始出现重复值,有兴趣可看本文最后附录部分。由于奇数的离散性,所以只要线程数小于m或者SQMASK两者中的最小值,则每个线程都能唯一地占据一个ws中的一个位置

    • externalPush等外部操作创建的workQueue:使用上面介绍的方法来获取偶数
    • createWorker:与externalPush不同的是,pool内部有一个静态常量SEED_INCREMENT=0x9e3779b9,以及一个普通属性indexSeed=0
      int r=indexSeed += SEED_INCREMENT,所以获取的值跟externalPush是差不多的
      无论哪种方式,最终都是workQueue.hint=r,即workQueue.hint的值就是用来定位workQueue所用的r。

    任务提交

    内部外部

    当一个操作是在非ForkjoinThread的线程中进行的,则称该操作为外部操作。比如我们前面执行pool.invoke,invoke内又执行externalPush。由于invoke是在非ForkjoinThread线程中进行的(这里是在main线程中进行),所以是一个外部操作,调用的是externalPush。之后task的执行是通过ForkJoinThread来执行的,所以task中的fork就是内部操作,调用的是push,把任务提交到工作队列。其实fork的实现是类似下面这样的:

    if(Thread.currentThread() instanceof ForkJoinThread){
            push(this)
    }else{
            externaPush(this)
    }
    

    即fork会根据执行自身的线程是否是ForkJoinThread的实例来判断是处于外部还是内部。那为何要区分内外部?
    任何线程都可以使用ForkJoin框架,但是对于非ForkJoinThread的线程,它到底是怎样的,ForkJoin无法控制,也无法对其优化。因此区分出内外部,这样方便ForkJoin框架对任务的执行进行控制和优化
    forkJoinPool.invoke(task)是把任务放入工作队列,并等待任务执行。源码如下

        public <T> T invoke(ForkJoinTask<T> task) {
            if (task == null)
                throw new NullPointerException();
            externalPush(task);
            return task.join();
        }
    

    这里externalPush负责任务提交,externalPush源码如下:

        final void externalPush(ForkJoinTask<?> task) {
            WorkQueue[] ws; WorkQueue q; int m;
            int r = ThreadLocalRandom.getProbe();
            // runState 有如下值
           /*
               private static final int  RSLOCK     = 1;
               private static final int  RSIGNAL    = 1 << 1;
               private static final int  STARTED    = 1 << 2;
               private static final int  STOP       = 1 << 29;
               private static final int  TERMINATED = 1 << 30;
               private static final int  SHUTDOWN   = 1 << 31;
           */
             // 如果runState= 0 说明 无状态, 没有其他线程在执行其他操作, 比如锁定线程池, 终止线程池,小于0 表示shutdown 状态
        
            int rs = runState;
            // workQueues !=null 说明 初始化过
            if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
                //q = ws[m & r & SQMASK]) 是偶数的下标 ws.length 永远是偶数 ws.length -1 & r 不可能大于 ws.length-1 再与偶数位就一定是偶数
    
                U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                 // 实现无锁锁定
                ForkJoinTask<?>[] a; int am, n, s;
                if ((a = q.array) != null &&
                    (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                    int j = ((am & s) << ASHIFT) + ABASE;
                    U.putOrderedObject(a, j, task);
                    U.putOrderedInt(q, QTOP, s + 1);
                    U.putIntVolatile(q, QLOCK, 0);
                    if (n <= 1)
                        signalWork(ws, q);
                    return;
                }
                U.compareAndSwapInt(q, QLOCK, 1, 0);
            }
            externalSubmit(task);
        }
    

    相关文章

      网友评论

        本文标题:ForkjoinPool -1

        本文链接:https://www.haomeiwen.com/subject/orvedrtx.html