美文网首页
java并发编程(4):ForkJoinPool框架源码详解

java并发编程(4):ForkJoinPool框架源码详解

作者: 桥头放牛娃 | 来源:发表于2019-10-30 23:11 被阅读0次

    1、ForkJoinPool简介

    ForkJoinPool运用fork-join的原理,使用分而治之的思想,将大任务进行拆分,直到拆分成无法可再拆分的最小单元,并将拆分后的任务分配给多线程执行,最终再将执行结果进行join。同时利用工作窃取算法,使得任务能及时被空闲线程处理。故ForkJoinPool适于可将大任务分割成类似的小任务的场景。

    1.1、ForkJoinPool类继承结构

    ForkJoinPool也继承于AbstractExecutorService抽象类,实现ExecutorService相关接口。

    ForkJoinPool类继承结构.png

    1.2、ForkJoinPool体系主要类简介

    ForkJoinTask:提交到ForkJoinPool中的任务,在使用时主要有三个实现。RecursiveTask:可以递归执行的ForkJoinTask;RecursiveAction:无返回值的RecursiveTask;CountedCompleter:执行完成后,触发自定义钩子函数。

    ForkJoinWorkerThread:运行 ForkJoinTask 任务的工作线程。每个ForkJoinWorkerThread都关联其所属的ForkJoinPool及其工作队列WorkQueue。

    WorkQueue:任务队列,支持LIFO的栈式操作和FIFO的队列操作。

    2、ForkJoinTask源码解析

    ForkJoinTask将任务fork成足够小的任务,并发解决这些小任务,然后将这些小任务结果join。这种思想充分利用了CPU的多核系统,使得CPU的利用率得到大幅度提升,减少了任务执行时间。通常我们会利用ForkJoinTask的fork方法来分割任务,利用join方法来合并任务。

    2.1、ForkJoinTask任务状态

    statue为ForkJoinTask的状态,其初始状态为0,标识正则处理任务状态;NORMAL:标识任务正常结束;CANCELLED:标识任务被取消;EXCEPTIONAL:标识任务执行异常;SIGNAL:表示有依赖当前任务结果的任务,需要执行完成后进行通知。

    //任务状态,初始值为0
    volatile int status; // accessed directly by pool and workers
    // 任务状态的掩码
    static final int DONE_MASK   = 0xf0000000;  
    // 正常状态,负数,标识任务已经完成
    static final int NORMAL      = 0xf0000000;  
    // 任务取消,非负,<NORMAL
    static final int CANCELLED   = 0xc0000000;  
    // 任务异常,非负,<CANCELLED
    static final int EXCEPTIONAL = 0x80000000;  
    // 通知状态,>= 1<<16,有其他任务依赖当前任务,任务结束前,通知其他任务join当前任务的结果。
    static final int SIGNAL      = 0x00010000;  
    // 低位掩码
    static final int SMASK       = 0x0000ffff;  
    

    2.2、主要方法实现

    ForkJoinTask的主要方法有异步执行方法fork(),获取结果方法join(),执行任务方法invoke系列,其他获取状态即结果等方法。

    fork()源码解析:

    public final ForkJoinTask<V> fork() {
        Thread t;
        //如果线程类型为ForkJoinWorkerThread,则将任务推入workQueue进行处理,
        //否则,交由ForkJoinPool的common线程池进行处理
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }
    

    join()源码解析:

    public final V join() {
        int s;
        //调用doJoin()进行任务的执行,若任务结果为非正常完成,则根据状态抛出不同的异常,
        //如若状态为CANCELLED,则抛出CancellationException(),异常;
        //若状态为EXCEPTIONAL,则抛出包装后的异常
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
    
    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        //1、若任务状态为正常完成(status < 0),则返回任务的正常完成状态;
        //2、若执行任务的当前线程类型为ForkJoinWorkerThread,且将任务从线程的工作队列中移除成功,
        //则调用doExec()执行任务,若任务执行状态为正常结束,则返回状态,否则awaitJoin()等待任务结束。
        //3、否则调用externalAwaitDone()等待任务执行完成。
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }
    
    final int doExec() {
        int s; boolean completed;
        //任务未完成?
        if ((s = status) >= 0) {
            try {
                //执行任务
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            //设置任务状态为正常完成
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }
    
    private int externalAwaitDone() {
        //任务处理
        int s = ((this instanceof CountedCompleter) ? // try helping
                 ForkJoinPool.common.externalHelpComplete(
                     (CountedCompleter<?>)this, 0) :
                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
        if (s >= 0 && (s = status) >= 0) {
            boolean interrupted = false;
            do {
                //等待任务完成
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                    synchronized (this) {
                        if (status >= 0) {
                            try {
                                wait(0L);
                            } catch (InterruptedException ie) {
                                interrupted = true;
                            }
                        }
                        //任务完成后通知其他依赖的任务
                        else
                            notifyAll();
                    }
                }
            } while ((s = status) >= 0);
            if (interrupted)
                Thread.currentThread().interrupt();
        }
        return s;
    }
    

    invoke系列方法源码解析:

    public final V invoke() {
        int s;
        //执行任务并返回状态,处理同doJoin()类似
        if ((s = doInvoke() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
    
    
    private int doInvoke() {
        int s; Thread t; ForkJoinWorkerThread wt;
        //执行任务并获取任务状态,状态<0表示正常完成,否则等待任务完成
        return (s = doExec()) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (wt = (ForkJoinWorkerThread)t).pool.
            awaitJoin(wt.workQueue, this, 0L) :
            externalAwaitDone();
    }
    
    //处理两个任务
    public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
        int s1, s2;
        //提交任务t2,交由线程池执行
        t2.fork();
        //执行任务t1并获取直接结果的任务状态
        if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
            t1.reportException(s1);
        //获取任务t2的直接结果状态
        if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
            t2.reportException(s2);
    }
    
    //处理多个任务
    public static void invokeAll(ForkJoinTask<?>... tasks) {
        Throwable ex = null;
        int last = tasks.length - 1;
        for (int i = last; i >= 0; --i) {
            ForkJoinTask<?> t = tasks[i];
            //任务为空则跑NPE异常
            if (t == null) {
                if (ex == null)
                    ex = new NullPointerException();
            }
            //非最后一个任务,则推入线程池执行
            else if (i != 0)
                t.fork();
            //最后一个任务直接调用doInvoke()执行    
            else if (t.doInvoke() < NORMAL && ex == null)
                ex = t.getException();
        }
        //遍历任务,获取任务执行结果
        for (int i = 1; i <= last; ++i) {
            ForkJoinTask<?> t = tasks[i];
            if (t != null) {
                //若有某个任务执行有异常,则取消所有任务
                if (ex != null)
                    t.cancel(false);
                //获取任务执行结果,若结果非正常结束,获取异常结果    
                else if (t.doJoin() < NORMAL)
                    ex = t.getException();
            }
        }
        if (ex != null)
            rethrow(ex);
    }
    
    
    

    获取任务状态及执行结果等方法:

    方法名 说明
    cancel 取消任务
    isDone 判断任务是否正常完成
    isCancelled 判断任务是否已取消
    isCompletedAbnormally 判断任务是否非正常完成,如被取消或任务执行异常等
    isCompletedNormally 判断任务是否执行正常完成,及任务状态是否为NORMAL
    getException 获取任务执行的异常结果
    completeExceptionally 将任务状态设置为异常,并设置异常结果
    complete 将任务设置正常结束,并设置任务执行结果
    get 获取任务执行结果,若任务取消或异常,则抛出异常;否则返回任务执行结果

    3、WorkQueue源码详解

    WorkQueue为ForkJoinPool的工作队列,其封装提交的任务ForkJoinTask、线程池ForkJoinPool、执行线程ForkJoinWorkerThread、及其他任务相关数据等。

    3.1、主要属性说明

    //
    volatile int scanState;    // versioned, <0: inactive; odd:scanning
    int stackPred;             // pool stack (ctl) predecessor
    int nsteals;               // number of steals
    int hint;                  // randomization and stealer index hint
    int config;                // pool index and mode
    volatile int qlock;        // 1: locked, < 0: terminate; else 0
    volatile int base;         // index of next slot for poll
    int top;                   // index of next slot for push
    ForkJoinTask<?>[] array;   // the elements (initially unallocated)
    final ForkJoinPool pool;   // the containing pool (may be null)
    final ForkJoinWorkerThread owner; // owning thread or null if shared
    volatile Thread parker;    // == owner during call to park; else null
    volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
    volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
    

    scanState:如果WorkQueue没有属于自己的owner(下标为偶数的都没有),该值为 inactive 也就是一个负数;如果有自己的owner,该值的初始值为其在WorkQueue[]数组中的下标,也肯定是个奇数;如果这个值,变成了偶数,说明该队列所属的Thread正在执行Task。
    stackPred:前任池(WorkQueue[])索引,由此构成一个栈;
    config:index | mode。 如果下标为偶数的WorkQueue,则其mode是共享类型。如果有自己的owner 默认是 LIFO;
    **qlock: **锁标识,在多线程往队列中添加数据,会有竞争,使用此标识抢占锁。1: locked, < 0: terminate; else 0
    base:worker steal的偏移量,因为其他的线程都可以偷该队列的任务,所有base使用volatile标识。
    top:owner执行任务的偏移量。
    parker:如果 owner 挂起,则使用该变量做记录挂起owner的线程。
    **currentJoin: **当前正在join等待结果的任务。
    currentSteal:当前执行的任务是steal过来的任务,该变量做记录。

    3.2、主要方法说明

    入队方法:

    //工作线程将任务提交到其对应的工作队列中
    final void push(ForkJoinTask<?> task) {
        ForkJoinTask<?>[] a; ForkJoinPool p;
        int b = base, s = top, n;
        if ((a = array) != null) {    // ignore if queue removed
            int m = a.length - 1;     // fenced write for task visibility
            //计算放置任务的位置,并将任务保存到队列中
            U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
            //修改顶部数据
            U.putOrderedInt(this, QTOP, s + 1);
            //
            if ((n = s - b) <= 1) {
                if ((p = pool) != null)
                    p.signalWork(p.workQueues, this);
            }
            //队列已满,则进行扩容
            else if (n >= m)
                growArray();
        }
    }
    

    出队方法:

    final ForkJoinTask<?> pop() {
       ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
       if ((a = array) != null && (m = a.length - 1) >= 0) {
           for (int s; (s = top - 1) - base >= 0;) {
               long j = ((m & s) << ASHIFT) + ABASE;
               //数组下班j处有任务,则cas获取获取任务,并修改top值
               if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                   break;
               if (U.compareAndSwapObject(a, j, t, null)) {
                   U.putOrderedInt(this, QTOP, s);
                   return t;
               }
           }
       }
       return null;
    }
    
    //从base到top获取任务
    final ForkJoinTask<?> poll() {
       ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
       while ((b = base) - top < 0 && (a = array) != null) {
           //获取base数据偏移量
           int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
           //获取base处任务
           t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
           //base未被改动?
           if (base == b) {
               if (t != null) {
                   //cas更改base处的数据,同时base+1
                   if (U.compareAndSwapObject(a, j, t, null)) {
                       base = b + 1;
                       return t;
                   }
               }
               //任务全部取完?
               else if (b + 1 == top) // now empty
                   break;
           }
       }
       return null;
    }
    
    final ForkJoinTask<?> peek() {
       ForkJoinTask<?>[] a = array; int m;
       if (a == null || (m = a.length - 1) < 0)
           return null;
       //判断任务队列是FIFO或LILF?    
       int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base;
       int j = ((i & m) << ASHIFT) + ABASE;
       //获取指定顶部或底部的任务
       return (ForkJoinTask<?>)U.getObjectVolatile(a, j);
    }
    

    执行任务:

    final void execLocalTasks() {
        int b = base, m, s;
        ForkJoinTask<?>[] a = array;
        if (b - (s = top - 1) <= 0 && a != null &&
            (m = a.length - 1) >= 0) {
            //队列类型为FIFO?    
            if ((config & FIFO_QUEUE) == 0) {
                //遍历任务并执行
                for (ForkJoinTask<?> t;;) {
                    if ((t = (ForkJoinTask<?>)U.getAndSetObject
                         (a, ((m & s) << ASHIFT) + ABASE, null)) == null)
                        break;
                    U.putOrderedInt(this, QTOP, s);
                    t.doExec();
                    if (base - (s = top - 1) > 0)
                        break;
                }
            }
            else
                pollAndExecAll();
        }
    }
    
    //执行任务
    final void runTask(ForkJoinTask<?> task) {
        if (task != null) {
            //设置WorkQueue状态为执行任务状态
            scanState &= ~SCANNING; // mark as busy
            //执行窃取的任务
            (currentSteal = task).doExec();
            U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
            //执行所有本地
            execLocalTasks();
            ForkJoinWorkerThread thread = owner;
            if (++nsteals < 0)      // collect on overflow
                transferStealCount(pool);
            scanState |= SCANNING;
            if (thread != null)
                thread.afterTopLevelExec();
        }
    }
    

    4、ForkJoinWorkerThread源码解析

    ForkJoinWorkerThread为ForkJoinPool的运行线程实现类,其关联了对应的ForkJoinPool及WorkQueue。

    构造函数:

    //构造函数
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        // Use a placeholder until a useful name can be set in registerWorker
        super("aForkJoinWorkerThread");
        this.pool = pool;
        this.workQueue = pool.registerWorker(this);
    }
    

    执行任务的钩子函数:

    //线程启动时的钩子函数
    protected void onStart() {
    }
    //线程结束的钩子函数
    protected void onTermination(Throwable exception) {
    }
    

    执行任务:

    public void run() {
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                //线程开始钩子
                onStart();
                //执行任务
                pool.runWorker(workQueue);
            } catch (Throwable ex) {
                exception = ex;
            } finally {
                try {
                    //线程终止钩子
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception);
                }
            }
        }
    }
    
    //ForkJoinPool#runWorker()
    final void runWorker(WorkQueue w) {
        w.growArray();                   // allocate queue
        int seed = w.hint;               // initially holds randomization hint
        int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
        for (ForkJoinTask<?> t;;) {
            //扫描任务,并执行任务
            if ((t = scan(w, r)) != null)
                w.runTask(t);
            //等待窃取任务  
            else if (!awaitWork(w, r))
                break;
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
        }
    }
    

    5、ForkJoinPool源码详解

    5.1、主要属性

    // Bounds
    //低16位掩码,索引的最大位数,
    static final int SMASK        = 0xffff;        // short bits == max index
    //工作线程的最大容量
    static final int MAX_CAP      = 0x7fff;        // max #workers - 1
    //偶数低位掩码
    static final int EVENMASK     = 0xfffe;        // even short bits
    //偶数槽位数,最多64个偶数槽位(0x007e = 0111 1110,有效的是中间6个1的位置,111111 = 63,再加上000000(0槽位),总共64个)
    static final int SQMASK       = 0x007e;        // max 64 (even) slots
    
    // Masks and units for WorkQueue.scanState and ctl sp subfield
    //WorkQueue的状态:正在扫描任务
    static final int SCANNING     = 1;             // false when running tasks
    //WorkQueue的状态:非活动状态
    static final int INACTIVE     = 1 << 31;       // must be negative
    //版本号(防止CAS的ABA问题)
    static final int SS_SEQ       = 1 << 16;       // version count
    
    // Mode bits for ForkJoinPool.config and WorkQueue.config
    //模式掩码
    static final int MODE_MASK    = 0xffff << 16;  // top half of int
    //任务队列模式为LIFO
    static final int LIFO_QUEUE   = 0;
    //任务队列模式为FIFO
    static final int FIFO_QUEUE   = 1 << 16;
    //任务队列模式为共享模式
    static final int SHARED_QUEUE = 1 << 31;       // must be negative
    
    
    //线程工厂类
    public static final ForkJoinWorkerThreadFactory
        defaultForkJoinWorkerThreadFactory;
    
    //默认的公共线程池
    static final ForkJoinPool common;
    
    //并行度
    static final int commonParallelism;
    
    //最大备用线程数
    private static int commonMaxSpares;
    
    //线程变化序列号
    private static int poolNumberSequence;
    
    
    //ctl的低32位掩码
    private static final long SP_MASK    = 0xffffffffL;
    //ctl的高32位掩码
    private static final long UC_MASK    = ~SP_MASK;
    
    // Active counts
    //活跃线程的计算shift
    private static final int  AC_SHIFT   = 48;
    //活跃线程的最小单位
    private static final long AC_UNIT    = 0x0001L << AC_SHIFT;
    //活跃线程数的掩码
    private static final long AC_MASK    = 0xffffL << AC_SHIFT;
    
    // Total counts
    //工作线程shift
    private static final int  TC_SHIFT   = 32;
    //工作线程的最小单元
    private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
    //工作线程掩码
    private static final long TC_MASK    = 0xffffL << TC_SHIFT;
    //创建工作线程的标记
    private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
    
    // runState bits: SHUTDOWN must be negative, others arbitrary powers of two
    //线程池状态
    //锁定
    private static final int  RSLOCK     = 1;  
    //通知
    private static final int  RSIGNAL    = 1 << 1;
    //开始
    private static final int  STARTED    = 1 << 2;
    //停止
    private static final int  STOP       = 1 << 29;
    //终止
    private static final int  TERMINATED = 1 << 30;
    //关闭
    private static final int  SHUTDOWN   = 1 << 31;
    
    // Instance fields
    //线程池主控参数
    volatile long ctl;                   // main pool control
    //线程池运行状态
    volatile int runState;               // lockable status
    //并行度|模式
    final int config;                    // parallelism, mode
    //用于生成线程池的索引
    int indexSeed;                       // to generate worker index
    //工作队列池
    volatile WorkQueue[] workQueues;     // main registry
    //线程工厂
    final ForkJoinWorkerThreadFactory factory;
    //工作线程异常处理
    final UncaughtExceptionHandler ueh;  // per-worker UEH
    //工作线程名称的前缀
    final String workerNamePrefix;       // to create worker name string
    //偷取任务的总数
    volatile AtomicLong stealCounter;    // also used as sync monitor
    
    

    5.2、状态说明

    ctl参数说明:

    ctl参数说明.png

    字段ctl是ForkJoinPool的核心状态,它是一个64位的long类型数值,包含4个16位子字段:

    • AC: 活动的工作线程数量减去目标并行度(目标并行度:最大的工作线程数量,所以AC一般是负值,等于0时,说明活动线程已经达到饱和了)
    • TC: 总的工作线程数量总数减去目标并行度(TC一般也是负值,等于0时,说明总的工作线程已经饱和,并且,AC一般小于等于TC)
    • SS: 栈顶工作线程状态和版本数(每一个线程在挂起时都会持有前一个等待线程所在工作队列的索引,由此构成一个等待的工作线程栈,栈顶是最新等待的线程,第一位表示状态1.不活动 0.活动,后15表示版本号,标识ID的版本-最后16位)。
    • ID: 栈顶工作线程所在工作队列的池索引。

    runState状态说明:

    • STARTED 1
    • STOP 1 << 1
    • TERMINATED 1<<2
    • SHUTDOWN 1<<29
    • RSLOCK 1<<30
    • RSIGNAL 1<<31

    runState记录了线程池的运行状态,特别地,除了SHUTDOWN是负数外,其他值都是正数,RSLOCK和RSIGNAL是跟锁相关。

    5.3、主要方法

    5.3.1、构造方法

    //parallelism:并行度
    //factory:线程工厂;
    //handler:异常处理
    //asyncMode:队列模式,true:FIFO,false:LIFO
    public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }
    

    5.3.2、提交任务

    invoke/execute/submit任务提交:

    //提交任务并等待任务执行完成,然后返回执行结果
    public <T> T invoke(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task.join();
    }
    
    //只提交任务
    public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
    }
    
    //提交任务并返回任务,ForkJoinTask可获取任务的异步执行结果
    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task;
    }
    

    提交任务主要有三中方法,invoke(),execute(),submit(),它们最终都是调用externalPush()进行处理,都属于外部提交,置于偶数索引的工作队列。

    externalPush()添加任务:

    final void externalPush(ForkJoinTask<?> task) {
        WorkQueue[] ws; WorkQueue q; int m;
        //探针值,用于计算WorkQueue槽位索引
        int r = ThreadLocalRandom.getProbe();
        int rs = runState;
        if ((ws = workQueues) != null  //线程池不为空
            && (m = (ws.length - 1)) >= 0   //线程池长度大于0
            && (q = ws[m & r & SQMASK]) != null  //获取偶数槽位的WorkQueue
            && r != 0 && rs > 0  //探针值不为0
            &&U.compareAndSwapInt(q, QLOCK, 0, 1)) { //加锁
            ForkJoinTask<?>[] a; int am, n, s;
            if ((a = q.array) != null //线程的任务队列不为空
                &&(am = a.length - 1) > (n = (s = q.top) - q.base)) { //任务数组长度大于数组中的任务个数,则无需扩容
                int j = ((am & s) << ASHIFT) + ABASE;  //计算任务的位置索引
                    U.putOrderedObject(a, j, task);//将任务放入任务数组中
                U.putOrderedInt(q, QTOP, s + 1); //设置top为top+1
                U.putIntVolatile(q, QLOCK, 0); //解锁
                //若之前的任务数<=1,则此槽位的线程可能在等待,同时可能其他槽位的线程也在等,此时需要唤醒线程来执行任务
                if (n <= 1)
                    signalWork(ws, q);
                return;
            }
            U.compareAndSwapInt(q, QLOCK, 1, 0); //添加任务失败,则解锁
        }
        //若if条件中有不满足的,或是添加任务失败,则通过externalSubmit()来添加任务
        externalSubmit(task);
    }
    

    signalWork()唤醒worker线程:

    final void signalWork(WorkQueue[] ws, WorkQueue q) {
        long c; int sp, i; WorkQueue v; Thread p;
        while ((c = ctl) < 0L) {     //活跃线程数太少,则创建工作线程
            if ((sp = (int)c) == 0) {    //无空闲线程?
                // (c & ADD_WORKER) != 0L,说明TC的最高位为1,为负值,而TC = 总的线程数 - 并行度 < 0,
                // 表示总的线程数 < 并行度,说明工作线程的个数还很少
                if ((c & ADD_WORKER) != 0L) 
                    tryAddWorker(c); //尝试添加线程
                break;
            }
            //未开始或已停止
            if (ws == null)                            // unstarted/terminated
                break;
            // 空闲线程栈顶端线程的所属工作队列索引(正常来讲,应该小于WorkQueue[]的长度的)   
            if (ws.length <= (i = sp & SMASK))         // terminated
                break;
            //正则终止?      
            if ((v = ws[i]) == null)                   // terminating
                break;
            // 作为下一个scanState待更新的值(增加了版本号,并且调整为激活状态)   
            int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
            // 如果d为0,则说明scanState还为更新过,然后才考虑CAS ctl
            int d = sp - v.scanState;                  // screen CAS
            // 下一个ctl的值,AC + 1 | 上一个等待线程的索引
            long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
            if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;                      // activate v
                // 如果线程阻塞了,唤醒它
                if ((p = v.parker) != null)
                    U.unpark(p);
                break;
            }
            // 没有任务,直接退出
            if (q != null && q.base == q.top)          // no more work
                break;
        }
    }
    

    externalSubmit()添加任务:

    externalSubmit()包含了三方面的操作:

    • 若线程未初始化,则初始化线程池,长度是2的幂次方;
    • 若选中的槽位位空,则初始化一个共享模式的工作队列;
    • 若选中槽位不为空,则获取任务队列,并将任务提交到任务队列,成功则唤醒沉睡的线程;若失败则专业槽位。
    private void externalSubmit(ForkJoinTask<?> task) {
        int r;                                    // initialize caller's probe
        //初始化当前线程的探针值,用于计算WorkQueue的索引
        if ((r = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();
            r = ThreadLocalRandom.getProbe();
        }
        for (;;) {
            WorkQueue[] ws; WorkQueue q; int rs, m, k;
            boolean move = false;
            //线程池已经关闭?
            if ((rs = runState) < 0) {
                tryTerminate(false, false);     // help terminate
                throw new RejectedExecutionException();
            }
            //1、线程池状态为还未初始化?;
            //2、线程池为空?
            //3、线程池中工作线程数为0?
            else if ((rs & STARTED) == 0 ||     // initialize
                     ((ws = workQueues) == null || 
                     (m = ws.length - 1) < 0)) {
                int ns = 0;
                //加锁
                rs = lockRunState();
                try {
                    //加锁后再次判断线程池状态,避免重复初始化
                    if ((rs & STARTED) == 0) {
                        U.compareAndSwapObject(this, STEALCOUNTER, null,
                                               new AtomicLong());
                        // create workQueues array with size a power of two
                        int p = config & SMASK; // ensure at least 2 slots
                        //保证n是2的幂次方
                        int n = (p > 1) ? p - 1 : 1;
                        n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                        n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                        workQueues = new WorkQueue[n];
                        ns = STARTED;
                    }
                } finally {
                    //解锁
                    unlockRunState(rs, (rs & ~RSLOCK) | ns);
                }
            }
            //获取随机偶数槽位的WorkQueue
            else if ((q = ws[k = r & m & SQMASK]) != null) {
                //对WorkQueue加锁
                if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                    ForkJoinTask<?>[] a = q.array;
                    int s = q.top;
                    boolean submitted = false; // initial submission or resizing
                    try {                      // locked version of push
                        //若WorkQueue的任务队列为空,则初始化任务队列(growArray)
                        if ((a != null && a.length > s + 1 - q.base) ||
                            (a = q.growArray()) != null) {
                            //计算任务索引的下标    
                            int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                            U.putOrderedObject(a, j, task);
                            U.putOrderedInt(q, QTOP, s + 1);
                            submitted = true;
                        }
                    } finally {
                        //解锁
                        U.compareAndSwapInt(q, QLOCK, 1, 0);
                    }
                    //唤醒挂起的线程
                    if (submitted) {
                        signalWork(ws, q);
                        return;
                    }
                }
                move = true;                   // move on failure
            }
            //在未加锁的情况下,创建新线程
            else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                q = new WorkQueue(this, null);
                q.hint = r;
                //共享模式
                q.config = k | SHARED_QUEUE;
                //未激活
                q.scanState = INACTIVE;
                //加锁
                rs = lockRunState();           // publish index
                if (rs > 0 &&  (ws = workQueues) != null &&
                    k < ws.length && ws[k] == null)
                    ws[k] = q;                 // else terminated
                //释放锁    
                unlockRunState(rs, rs & ~RSLOCK);
            }
            else
                move = true;                   // move if busy
            if (move)
                r = ThreadLocalRandom.advanceProbe(r);
        }
    }
    

    5.3.3、执行任务

    runWorker()是在ForkJoinWorkerThread的run()方法中调用,即在启动worker线程调用的。其主要工作是获取任务并执行任务,若线程池关闭,则等待任务队列的任务执行完成并退出。

    final void runWorker(WorkQueue w) {
        w.growArray();                   // allocate queue
        int seed = w.hint;               // initially holds randomization hint
        int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
        for (ForkJoinTask<?> t;;) {
            //扫描任务
            if ((t = scan(w, r)) != null)
                //工作线程执行任务
                w.runTask(t);
                //没有任务执行则等待
            else if (!awaitWork(w, r))
                break;
            //随机值更新    
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
        }
    }
    

    scan()扫描任务:

    private ForkJoinTask<?> scan(WorkQueue w, int r) {
        WorkQueue[] ws; int m;
        //线程池不为空?
        if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
            int ss = w.scanState;                     // initially non-negative
            for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
                WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
                int b, n; long c;
                //槽位k不为空,尝试从该任务队列里获取任务
                if ((q = ws[k]) != null) {
                    //有任务
                    if ((n = (b = q.base) - q.top) < 0 &&
                        (a = q.array) != null) {      // non-empty
                        //数组地址
                        long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                        if ((t = ((ForkJoinTask<?>)
                                  U.getObjectVolatile(a, i))) != null &&
                            q.base == b) {
                            //获取任务并更新base等索引信息    
                            if (ss >= 0) {
                                if (U.compareAndSwapObject(a, i, t, null)) {
                                    q.base = b + 1;
                                    //通知其他线程
                                    if (n < -1)       // signal others
                                        signalWork(ws, q);
                                    return t;
                                }
                            }
                            //设置WorkQueue的状态
                            else if (oldSum == 0 &&   // try to activate
                                     w.scanState < 0)
                                tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                        }
                        if (ss < 0)                   // refresh
                            ss = w.scanState;
                        r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                        origin = k = r & m;           // move and rescan
                        oldSum = checkSum = 0;
                        continue;
                    }
                    checkSum += b;
                }
                //未扫描到任务,准备inactive此工作队列
                if ((k = (k + 1) & m) == origin) {    // continue until stable
                    if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                        oldSum == (oldSum = checkSum)) {
                        if (ss < 0 || w.qlock < 0)    // already inactive
                            break;
                        int ns = ss | INACTIVE;       // try to inactivate
                        long nc = ((SP_MASK & ns) |
                                   (UC_MASK & ((c = ctl) - AC_UNIT)));
                        w.stackPred = (int)c;         // hold prev stack top
                        U.putInt(w, QSCANSTATE, ns);
                        if (U.compareAndSwapLong(this, CTL, c, nc))
                            ss = ns;
                        else
                            w.scanState = ss;         // back out
                    }
                    checkSum = 0;
                }
            }
        }
        return null;
    }
    

    awaitWork()为等待任务。若工作线程未获取到任务,则会执行此方法。

    private boolean awaitWork(WorkQueue w, int r) {
            if (w == null || w.qlock < 0) // w已经终止,返回false,不再扫描任务
                return false;
            for (int pred = w.stackPred, spins = SPINS, ss;;) { 
                if ((ss = w.scanState) >= 0) // 如果已经active,跳出,返回true,继续扫描任务
                    break;
                else if (spins > 0) { // 如果spins > 0,自旋等待
                    r ^= r << 6;
                    r ^= r >>> 21;
                    r ^= r << 7;
                    if (r >= 0 && --spins == 0) { // 随机消耗自旋次数
                        WorkQueue v;
                        WorkQueue[] ws;
                        int s, j;
                        AtomicLong sc;
                        if (pred != 0 // 除了自己,还有等待的线程-工作队列
                                && (ws = workQueues) != null // 线程池还在
                                && (j = pred & SMASK) < ws.length // 前任索引还在池范围内
                                && (v = ws[j]) != null // 前任任务队列还在
                                && (v.parker == null || v.scanState >= 0)) // 前任线程已经唤醒,且工作队列已经激活
                            spins = SPINS; // 上面的一系列判断表明,很快就有任务了,先不park,继续自旋
                    }
                } else if (w.qlock < 0) // 自旋之后,再次检查工作队列是否终止,若是,退出扫描
                    return false;
                else if (!Thread.interrupted()) { // 如果线程中断了,清除中断标记,不考虑park,否则进入该分支
                    long c, prevctl, parkTime, deadline;
                    int ac = (int) ((c = ctl) >> AC_SHIFT) + (config & SMASK); // 计算活跃线程的个数
                    if ((ac <= 0 && tryTerminate(false, false)) || (runState & STOP) != 0) // 线程池正在终止,退出扫描
                        return false;
                    if (ac <= 0 && ss == (int) c) { // 自己是栈顶等待者
                        prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); // 设置为前一次的ctl
                        int t = (short) (c >>> TC_SHIFT); // 总的线程数
                        if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl)) // 总线程数过多,直接退出扫描
                            return false;
                        parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); // 计算等待时间
                        deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
                    } else
                        prevctl = parkTime = deadline = 0L;
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this); // 加锁
                    w.parker = wt; // 设置parker,准备阻塞
                    if (w.scanState < 0 && ctl == c) // 阻塞前再次检查状态
                        U.park(false, parkTime);
                    U.putOrderedObject(w, QPARKER, null); // 唤醒后,置空parker
                    U.putObject(wt, PARKBLOCKER, null); // 解锁
                    if (w.scanState >= 0) // 已激活,跳出继续扫描
                        break;
                    if (parkTime != 0L && ctl == c && deadline - System.nanoTime() <= 0L
                            && U.compareAndSwapLong(this, CTL, c, prevctl)) // 超时,未等到任务,跳出,不再执行扫描任务,削减工作线程
                        return false; // shrink pool
                }
            }
            return true;
        }
    

    相关文章

      网友评论

          本文标题:java并发编程(4):ForkJoinPool框架源码详解

          本文链接:https://www.haomeiwen.com/subject/umaavctx.html