美文网首页程序员Java 并发
【Java 并发笔记】Fork/Join 框架相关整理(下)

【Java 并发笔记】Fork/Join 框架相关整理(下)

作者: 58bc06151329 | 来源:发表于2019-01-17 17:41 被阅读3次

    文前说明

    作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

    本文仅供学习交流使用,侵权必删。
    不用于商业目的,转载请注明出处。

    接上一篇 【Java 并发笔记】Fork/Join 框架相关整理(上)

    ForkJoinTask 对象

    ForkJoinTask
    • ForkJoinTask 实现了 Future 接口,说明它也是一个可取消的异步运算任务,实际上 ForkJoinTask 是 Future 的轻量级实现,主要用在纯粹是计算的函数式任务或者操作完全独立的对象计算任务。
      • fork 是主运行方法,用于异步执行。
      • join 方法在任务结果计算完毕之后才会运行,用来合并或返回计算结果。
      • ExceptionNode 是用于存储任务执行期间的异常信息的单向链表。
      • 其余四个类是为 Runnable/Callable 任务提供的适配器类,用于把 Runnable/Callable 转化为 ForkJoinTask 类型的任务(因为 ForkJoinPool 只可以运行 ForkJoinTask 类型的任务)。
    方法名称 说明
    final ForkJoinTask<V> fork() 向线程池提交任务,返回 this。
    final V join() 获取任务执行结果,如果仍然尚未执行那么会立刻执行任务。
    final V invoke() 通过当前线程直接执行任务。
    boolean cancel(boolean) 取消任务,boolean 参数可以随意指定,没有作用。
    final boolean isDone() 返回任务是否完成。
    final boolean isCancelled() 返回任务是否被取消。
    final boolean isCompletedAbnormally() 判断任务是不是非正常执行完成的(被取消或是抛出未捕获异常)。
    final boolean isCompletedNormally() 判断任务是否正常执行完成。
    final Throwable getException() 获得任务执行期间抛出的未捕获异常(如果有的话)。
    void completeExceptionally(Throwable) 当任务执行期间抛出指定的异常时忽略。
    void complete(V) 设置任务完成后的值,并强制将任务设为正常完成。
    final void quietlyComplete() 强制将任务设置为已正常完成。
    final V get() 获得任务执行的结果,若尚未执行结束则阻塞。
    final V get(long,TimeUnit) 获得任务执行的结果,若尚未执行结束则阻塞至多指定的时间。
    final void quietlyJoin() 快速 Join 任务而不获得执行结果。
    final void quietlyInvoke() 直接执行任务。
    void reinitialize() 重置任务,恢复到初始状态。
    boolean tryUnfork() 尝试从线程池中取出该任务。
    final short getForkJoinTaskTag() 返回此任务的标签值。
    final short setForkJoinTaskTag(short) 设置此任务的标签值。
    final boolean compareAndSetForkJoinTaskTag(short, short) 通过 CAS 将任务标签值由第一个参数设为第二个参数。

    任务状态

    • ForkJoinTask 仅有一个实例变量 status,是一个 volatile 修饰的 int 变量。
      • 当任务执行结束时,status 变量会小于 0。
    常量名 常量值 作用
    DONE_MASK 0xF0000000 表示保存任务状态的位。
    NORMAL 0xF0000000 当前任务已完成。
    CANCELLED 0xC0000000 当前任务被取消。
    EXCEPTIONAL 0x80000000 当前任务抛出未捕获的异常。
    SIGNAL 0x00010000 标记位。
    SMASK 0x0000FFFF 任务的标签数。
    /** The run status of this task */
    volatile int status; // 默认等于0
    static final int DONE_MASK   = 0xf0000000;  // 小于0表示任务已经执行过,大于0说明任务没执行完
    // NORMAL,CANCELLED,EXCEPTIONAL均小于0
    static final int NORMAL      = 0xf0000000;  // must be negative
    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
    static final int SMASK       = 0x0000ffff;  // short bits for tags
    
    ForkJoinTask 状态
    • 提供了以下方法查询任务当前的状态。
    isCancelled() // CANCELLED
    isCompletedAbnormally // status < NORMAL => CANCELLED || EXCEPTIONAL
    isCompletedNormally // NORMAL
    isDone() // status < 0 => NORMAL || CANCELLED || EXCEPTIONAL
    

    3.2.2 任务提交过程

    • ForkJoinPool 中的任务执行分两种:
      • 直接通过 FJP 提交的外部任务(external/submissions task),存放在 workQueues 的偶数槽位。
      • 通过内部 fork 分割的子任务(Worker task),存放在 workQueues 的奇数槽位。

    3.2.2.1 外部任务(external/submissions task)提交

    • 向 ForkJoinPool 提交任务有三种方式。
      • invoke() 会等待任务计算完毕并返回计算结果。
      • execute() 是直接向池提交一个任务来异步执行,无返回结果。
      • submit() 也是异步执行,但是会返回提交的任务,在适当的时候可通过 task.get() 获取执行结果。
    • ForkJoinPool 提供的提交接口很多,不管提交的是 Callable、Runnable、ForkJoinTask 最终都会转换成 ForkJoinTask 类型的任务,调用方法 externalPush(ForkJoinTask<?> task) 来进行提交逻辑。
    // 提交没有返回值的任务
    public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
    }
    public void execute(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask<?> job;
        if (task instanceof ForkJoinTask<?>) // 避免二次包装
            job = (ForkJoinTask<?>) task;
        else
            job = new ForkJoinTask.RunnableExecuteAction(task); // 包装成ForkJoinTask
        externalPush(job);
    }
    // 提交有返回值的任务
    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task;
    }
    public <T> ForkJoinTask<T> submit(Callable<T> task) {
        // 包装成ForkJoinTask
        ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
        externalPush(job);
        return job;
    }
    public <T> ForkJoinTask<T> submit(Runnable task, T result) {
        // 包装成ForkJoinTask
        ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
        externalPush(job);
        return job;
    }
    public ForkJoinTask<?> submit(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        ForkJoinTask<?> job;
        if (task instanceof ForkJoinTask<?>) // 避免二次包装
            job = (ForkJoinTask<?>) task;
        else
            job = new ForkJoinTask.AdaptedRunnableAction(task); // 包装成ForkJoinTask
        externalPush(job);
        return job;
    }
    // 同步提交,阻塞等结果
    public <T> T invoke(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task.join(); // 等待任务完成
    }
    

    externalPush 方法

    • externalPush()externalSubmit() 的作用都是把任务放到队列中等待执行。不同的是,externalSubmit() 是完整版的 externalPush(),在任务首次提交时,需要初始化 workQueues 及其他相关属性,这个初始化操作就是 externalSubmit() 来完成的。而后再向池中提交的任务都是通过简化版的 externalPush() 来完成。
    • externalPush 的执行流程。
      • 首先找到一个随机偶数槽位的 workQueue。
      • 然后把任务放入这个 workQueue 的任务数组中,并更新 top 位。
      • 如果队列的剩余任务数小于 1,则尝试创建或激活一个工作线程来运行任务(防止在 externalSubmit() 初始化时发生异常导致工作线程创建失败)。
    //添加给定任务到submission队列中
    final void externalPush(ForkJoinTask<?> task) {
        WorkQueue[] ws;
        WorkQueue q;
        int m;
        int r = ThreadLocalRandom.getProbe();//探针值,用于计算WorkQueue槽位索引
        int rs = runState;
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
                (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && //获取随机偶数槽位的workQueue
                U.compareAndSwapInt(q, QLOCK, 0, 1)) {//锁定workQueue
            ForkJoinTask<?>[] a;
            int am, n, s;
            if ((a = q.array) != null &&
                    (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                int j = ((am & s) << ASHIFT) + ABASE;//计算任务索引位置
                U.putOrderedObject(a, j, task);//任务入列
                U.putOrderedInt(q, QTOP, s + 1);//更新push slot
                U.putIntVolatile(q, QLOCK, 0);//解除锁定
                if (n <= 1)
                    signalWork(ws, q);//任务数小于1时尝试创建或激活一个工作线程
                return;
            }
            U.compareAndSwapInt(q, QLOCK, 1, 0);//解除锁定
        }
        externalSubmit(task);//初始化workQueues及相关属性
    }
    
    • externalSubmit 主要用于第一次提交任务时初始化workQueues及相关属性,并且提交给定任务到队列中,具体执行步骤如下。
      1. 如果池为终止状态(runState<0),调用 tryTerminate 终止线程池,并抛出任务拒绝异常。
      2. 如果尚未初始化,就为 FJP 执行初始化操作,初始化 stealCounter、创建 workerQueues,然后继续自旋。
      3. 初始化完成后,执行在 externalPush() 中相同的操作。获取 workQueue,放入指定任务。任务提交成功后调用 signalWork() 方法创建或激活线程。
      4. 如果在步骤 3 中获取到的 workQueue 为 null,会在这一步中创建一个
        workQueue,创建成功继续自旋执行第 3 步操作。
      5. 如果非上述情况,或者有线程争用资源导致获取锁失败,就重新获取线程探针值继续自旋。

    signalWork 方法

    • 新建或唤醒一个工作线程,在 externalPush()externalSubmit()workQueue.push()scan() 中调用。
      • 如果还有空闲线程,则尝试唤醒索引到的 WorkQueue 的 parker 线程。
      • 如果工作线程过少 (ctl & ADD_WORKER) != 0L,则调用 tryAddWorker() 添加一个新的工作线程。
    final void signalWork(WorkQueue[] ws, WorkQueue q) {
        long c;
        int sp, i;
        WorkQueue v;
        Thread p;
        while ((c = ctl) < 0L) {                       // too few active
            if ((sp = (int) c) == 0) {                  // no idle workers
                if ((c & ADD_WORKER) != 0L)            // too few workers
                    tryAddWorker(c);//工作线程太少,添加新的工作线程
                break;
            }
            if (ws == null)                            // unstarted/terminated
                break;
            if (ws.length <= (i = sp & SMASK))         // terminated
                break;
            if ((v = ws[i]) == null)                   // terminating
                break;
            //计算ctl,加上版本戳SS_SEQ避免ABA问题
            int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
            int d = sp - v.scanState;                  // screen CAS
            //计算活跃线程数(高32位)并更新为下一个栈顶的scanState(低32位)
            long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
            if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;                      // activate v
                if ((p = v.parker) != null)
                    U.unpark(p);//唤醒阻塞线程
                break;
            }
            if (q != null && q.base == q.top)          // no more work
                break;
        }
    }
    

    tryAddWorker 方法

    • 尝试添加一个新的工作线程,首先更新 ctl 中的工作线程数。
      • AC 加 1,同时 TC 也加 1。
      • 然后调用 createWorker() 创建工作线程。
    private void tryAddWorker(long c) {
        boolean add = false;
        do {
            long nc = ((AC_MASK & (c + AC_UNIT)) |
                       (TC_MASK & (c + TC_UNIT)));
            if (ctl == c) {
                int rs, stop;                 // check if terminating
                if ((stop = (rs = lockRunState()) & STOP) == 0)
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK);//释放锁
                if (stop != 0)
                    break;
                if (add) {
                    createWorker();//创建工作线程
                    break;
                }
            }
        } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
    }
    

    createWorker 方法

    • createWorker 首先通过线程工厂创一个新的 ForkJoinWorkerThread,然后启动这个工作线程 wt.start()。如果期间发生异常,调用 deregisterWorker 处理线程创建失败的逻辑。
      • ForkJoinWorkerThread 在构造时首先调用父类 Thread 的方法,然后为工作线程注册 pool 和 workQueue,而 workQueue 的注册任务由 ForkJoinPool.registerWorker 来完成。
    • createWorker() 中启动工作线程后 wt.start(),当为线程分配到 CPU 执行时间片之后会运行 ForkJoinWorkerThread 的 run() 方法开启线程来执行任务。
    private boolean createWorker() {
        ForkJoinWorkerThreadFactory fac = factory;
        Throwable ex = null;
        ForkJoinWorkerThread wt = null;
        try {
            if (fac != null && (wt = fac.newThread(this)) != null) {
                wt.start();
                return true;
            }
        } catch (Throwable rex) {
            ex = rex;
        }
        deregisterWorker(wt, ex);//线程创建失败处理
        return false;
    }
    

    registerWorker 方法

    • registerWorker 是 ForkJoinWorkerThread 构造器的回调函数,用于创建和记录工作线程的 WorkQueue。
      • 在此为工作线程创建的 WorkQueue 是放在 奇数 索引的 i = ((s << 1) | 1) & m
    final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
        UncaughtExceptionHandler handler;
        //设置为守护线程
        wt.setDaemon(true);                           // configure thread
        if ((handler = ueh) != null)
            wt.setUncaughtExceptionHandler(handler);
        WorkQueue w = new WorkQueue(this, wt);//构造新的WorkQueue
        int i = 0;                                    // assign a pool index
        int mode = config & MODE_MASK;
        int rs = lockRunState();
        try {
            WorkQueue[] ws;
            int n;                    // skip if no array
            if ((ws = workQueues) != null && (n = ws.length) > 0) {
                //生成新建WorkQueue的索引
                int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                int m = n - 1;
                i = ((s << 1) | 1) & m;               // Worker任务放在奇数索引位 odd-numbered indices
                if (ws[i] != null) {                  // collision 已存在,重新计算索引位
                    int probes = 0;                   // step by approx half n
                    int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                    //查找可用的索引位
                    while (ws[i = (i + step) & m] != null) {
                        if (++probes >= n) {//所有索引位都被占用,对workQueues进行扩容
                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);//workQueues 扩容
                            m = n - 1;
                            probes = 0;
                        }
                    }
                }
                w.hint = s;                           // use as random seed
                w.config = i | mode;
                w.scanState = i;                      // publication fence
                ws[i] = w;
            }
        } finally {
            unlockRunState(rs, rs & ~RSLOCK);
        }
        wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
        return w;
    }
    

    3.2.2.2 子任务(Worker task)提交

    • 由任务的 fork() 方法完成。
    • 任务被分割 Fork 之后调用了 ForkJoinPool.WorkQueue.push() 方法直接把任务放到队列中等待被执行。
    • 如果当前线程是 Worker 线程,说明当前任务是 Fork 分割的子任务,通过ForkJoinPool.workQueue.push() 方法直接把任务放到自己的等待队列中。
      • 否则调用 ForkJoinPool.externalPush() 提交到一个随机的等待队列中(外部任务)。

    push 方法

    final void push(ForkJoinTask<?> task) {
        ForkJoinTask<?>[] a;
        ForkJoinPool p;
        int b = base, s = top, n;
        if ((a = array) != null) {    // ignore if queue removed
            int m = a.length - 1;     // fenced write for task visibility
            U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
            U.putOrderedInt(this, QTOP, s + 1);
            if ((n = s - b) <= 1) {//首次提交,创建或唤醒一个工作线程
                if ((p = pool) != null)
                    p.signalWork(p.workQueues, this);
            } else if (n >= m)
                growArray();
        }
    }
    
    • 首先把任务放入等待队列并更新 top 位。
    • 如果当前 WorkQueue 为新建的等待队列 top-base<=1,则调用 signalWork() 方法为当前 WorkQueue 新建或唤醒一个工作线程。
    • 如果 WorkQueue 中的任务数组容量过小,则调用 growArray() 方法对其进行 两倍 扩容。

    growArray 方法

    final ForkJoinTask<?>[] growArray() {
        ForkJoinTask<?>[] oldA = array;//获取内部任务列表
        int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
        if (size > MAXIMUM_QUEUE_CAPACITY)
            throw new RejectedExecutionException("Queue capacity exceeded");
        int oldMask, t, b;
        //新建一个两倍容量的任务数组
        ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
        if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
                (t = top) - (b = base) > 0) {
            int mask = size - 1;
            //从老数组中拿出数据,放到新的数组中
            do { // emulate poll from old array, push to new array
                ForkJoinTask<?> x;
                int oldj = ((b & oldMask) << ASHIFT) + ABASE;
                int j = ((b & mask) << ASHIFT) + ABASE;
                x = (ForkJoinTask<?>) U.getObjectVolatile(oldA, oldj);
                if (x != null &&
                        U.compareAndSwapObject(oldA, oldj, x, null))
                    U.putObjectVolatile(a, j, x);
            } while (++b != t);
        }
        return a;
    }
    

    3.2.3 任务执行过程

    • ForkJoinPool.createWorker() 方法中创建工作线程后,会启动工作线程,系统为工作线程分配到 CPU 执行时间片之后会执行 ForkJoinWorkerThread 的 run() 方法正式开始执行任务。

    run 方法

    • 在工作线程运行前后会调用自定义钩子函数 onStart()onTermination(),任务的运行则是调用了 ForkJoinPool.runWorker()
      • 如果全部任务执行完毕或者期间遭遇异常,则通过 ForkJoinPool.deregisterWorker 关闭工作线程并处理异常信息。
    public void run() {
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                onStart();//钩子方法,可自定义扩展
                pool.runWorker(workQueue);
            } catch (Throwable ex) {
                exception = ex;
            } finally {
                try {
                    onTermination(exception);//钩子方法,可自定义扩展
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception);//处理异常
                }
            }
        }
    }
    

    runWorker 方法

    • runWorker是 ForkJoinWorkerThread 的主运行方法,用来依次执行当前工作线程中的任务。
      • 调用 scan 方法依次获取任务,然后调用 WorkQueue.runTask 运行任务。
      • 如果未扫描到任务,则调用 awaitWork 等待,直到工作线程/线程池终止或等待超时。
    final void runWorker(WorkQueue w) {
        w.growArray();                   // allocate queue
        int seed = w.hint;               // initially holds randomization hint
        int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
        for (ForkJoinTask<?> t; ; ) {
            if ((t = scan(w, r)) != null)//扫描任务执行
                w.runTask(t);
            else if (!awaitWork(w, r))
                break;
            r ^= r << 13;
            r ^= r >>> 17;
            r ^= r << 5; // xorshift
        }
    }
    

    scan 方法

    • 扫描并尝试偷取一个任务。使用 w.hint 进行随机索引 WorkQueue,并不一定会执行当前 WorkQueue 中的任务,而是偷取别的 Worker 的任务来执行。
    • 执行流程。
      1. 取随机位置的一个 WorkQueue。
      2. 获取 base 位的 ForkJoinTask,成功取到后更新 base 位并返回任务。
        • 如果取到的 WorkQueue 中任务数大于 1,则调用 signalWork() 创建或唤醒其他工作线程。
      3. 如果当前工作线程处于不活跃状态(INACTIVE),则调用 tryRelease() 尝试唤醒栈顶工作线程来执行。
    private ForkJoinTask<?> scan(WorkQueue w, int r) {
        WorkQueue[] ws;
        int m;
        if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
            int ss = w.scanState;                     // initially non-negative
            //初始扫描起点,自旋扫描
            for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0; ; ) {
                WorkQueue q;
                ForkJoinTask<?>[] a;
                ForkJoinTask<?> t;
                int b, n;
                long c;
                if ((q = ws[k]) != null) {//获取workQueue
                    if ((n = (b = q.base) - q.top) < 0 &&
                            (a = q.array) != null) {      // non-empty
                        //计算偏移量
                        long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                        if ((t = ((ForkJoinTask<?>)
                                U.getObjectVolatile(a, i))) != null && //取base位置任务
                                q.base == b) {//stable
                            if (ss >= 0) {  //scanning
                                if (U.compareAndSwapObject(a, i, t, null)) {//
                                    q.base = b + 1;//更新base位
                                    if (n < -1)       // signal others
                                        signalWork(ws, q);//创建或唤醒工作线程来运行任务
                                    return t;
                                }
                            } else if (oldSum == 0 &&   // try to activate 尝试激活工作线程
                                    w.scanState < 0)
                                tryRelease(c = ctl, ws[m & (int) c], AC_UNIT);//唤醒栈顶工作线程
                        }
                        //base位置任务为空或base位置偏移,随机移位重新扫描
                        if (ss < 0)                   // refresh
                            ss = w.scanState;
                        r ^= r << 1;
                        r ^= r >>> 3;
                        r ^= r << 10;
                        origin = k = r & m;           // move and rescan
                        oldSum = checkSum = 0;
                        continue;
                    }
                    checkSum += b;//队列任务为空,记录base位
                }
                //更新索引k 继续向后查找
                if ((k = (k + 1) & m) == origin) {    // continue until stable
                    //运行到这里说明已经扫描了全部的 workQueues,但并未扫描到任务
     
                    if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                            oldSum == (oldSum = checkSum)) {
                        if (ss < 0 || w.qlock < 0)    // already inactive
                            break;// 已经被灭活或终止,跳出循环
     
                        //对当前WorkQueue进行灭活操作
                        int ns = ss | INACTIVE;       // try to inactivate
                        long nc = ((SP_MASK & ns) |
                                (UC_MASK & ((c = ctl) - AC_UNIT)));//计算ctl为INACTIVE状态并减少活跃线程数
                        w.stackPred = (int) c;         // hold prev stack top
                        U.putInt(w, QSCANSTATE, ns);//修改scanState为inactive状态
                        if (U.compareAndSwapLong(this, CTL, c, nc))//更新scanState为灭活状态
                            ss = ns;
                        else
                            w.scanState = ss;         // back out
                    }
                    checkSum = 0;//重置checkSum,继续循环
                }
            }
        }
        return null;
    }
    

    tryRelease 方法

    • 如果 base 位任务为空或发生偏移,则对索引位进行随机移位,然后重新扫描。
    • 如果扫描整个 workQueues 之后没有获取到任务,则设置当前工作线程为 INACTIVE 状态。
      • 然后重置 checkSum,再次扫描一圈之后如果还没有任务则跳出循环返回 null。
    private boolean tryRelease(long c, WorkQueue v, long inc) {
        int sp = (int) c, vs = (sp + SS_SEQ) & ~INACTIVE;
        Thread p;
        //ctl低32位等于scanState,说明可以唤醒parker线程
        if (v != null && v.scanState == sp) {          // v is at top of stack
            //计算活跃线程数(高32位)并更新为下一个栈顶的scanState(低32位)
            long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
            if (U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;
                if ((p = v.parker) != null)
                    U.unpark(p);//唤醒线程
                return true;
            }
        }
        return false;
    }
    

    awaitWork 方法

    • 如果 scan() 方法未扫描到任务,会调用 awaitWork() 等待获取任务。
      • 在等待获取任务期间,如果工作线程或线程池已经终止则直接返回 false。
      • 如果当前无 active 线程,尝试终止线程池并返回 false。
      • 如果终止失败并且当前是最后一个等待的 Worker,就阻塞指定的时间(IDLE_TIMEOUT)。
      • 等到届期或被唤醒后如果发现自己是 scanning(scanState >= 0)状态,说明已经等到任务,跳出等待返回 true 继续 scan(),否则的更新 ctl 并返回 false。
    private boolean awaitWork(WorkQueue w, int r) {
        if (w == null || w.qlock < 0)                 // w is terminating
            return false;
        for (int pred = w.stackPred, spins = SPINS, ss; ; ) {
            if ((ss = w.scanState) >= 0)//正在扫描,跳出循环
                break;
            else if (spins > 0) {
                r ^= r << 6;
                r ^= r >>> 21;
                r ^= r << 7;
                if (r >= 0 && --spins == 0) {         // randomize spins
                    WorkQueue v;
                    WorkQueue[] ws;
                    int s, j;
                    AtomicLong sc;
                    if (pred != 0 && (ws = workQueues) != null &&
                            (j = pred & SMASK) < ws.length &&
                            (v = ws[j]) != null &&        // see if pred parking
                            (v.parker == null || v.scanState >= 0))
                        spins = SPINS;                // continue spinning
                }
            } else if (w.qlock < 0)                     // 当前workQueue已经终止,返回false recheck after spins
                return false;
            else if (!Thread.interrupted()) {//判断线程是否被中断,并清除中断状态
                long c, prevctl, parkTime, deadline;
                int ac = (int) ((c = ctl) >> AC_SHIFT) + (config & SMASK);//活跃线程数
                if ((ac <= 0 && tryTerminate(false, false)) || //无active线程,尝试终止
                        (runState & STOP) != 0)           // pool terminating
                    return false;
                if (ac <= 0 && ss == (int) c) {        // is last waiter
                    //计算活跃线程数(高32位)并更新为下一个栈顶的scanState(低32位)
                    prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
                    int t = (short) (c >>> TC_SHIFT);  // shrink excess spares
                    if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))//总线程过量
                        return false;                 // else use timed wait
                    //计算空闲超时时间
                    parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
                    deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
                } else
                    prevctl = parkTime = deadline = 0L;
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
                w.parker = wt;//设置parker,准备阻塞
                if (w.scanState < 0 && ctl == c)      // recheck before park
                    U.park(false, parkTime);//阻塞指定的时间
     
                U.putOrderedObject(w, QPARKER, null);
                U.putObject(wt, PARKBLOCKER, null);
                if (w.scanState >= 0)//正在扫描,说明等到任务,跳出循环
                    break;
                if (parkTime != 0L && ctl == c &&
                        deadline - System.nanoTime() <= 0L &&
                        U.compareAndSwapLong(this, CTL, c, prevctl))//未等到任务,更新ctl,返回false
                    return false;                     // shrink pool
            }
        }
        return true;
    }
    

    runTask 方法

    • scan() 方法扫描到任务之后,调用 WorkQueue.runTask() 来执行获取到的任务。
      1. 标记 scanState 为正在执行状态。
      2. 更新 currentSteal 为当前获取到的任务并执行它,任务的执行调用了 ForkJoinTask.doExec() 方法。
    final void runTask(ForkJoinTask<?> task) {
        if (task != null) {
            scanState &= ~SCANNING; // mark as busy
            (currentSteal = task).doExec();//更新currentSteal并执行任务
            U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
            execLocalTasks();//依次执行本地任务
            ForkJoinWorkerThread thread = owner;
            if (++nsteals < 0)      // collect on overflow
                transferStealCount(pool);//增加偷取任务数
            scanState |= SCANNING;
            if (thread != null)
                thread.afterTopLevelExec();//执行钩子函数
        }
    }
    

    doExec 方法

    • 调用 execLocalTasks() 依次执行当前 WorkerQueue 中的任务。
    //ForkJoinTask.doExec()
    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();//执行我们定义的任务
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }
    

    execLocalTasks 方法

    • 更新偷取任务数。
    • 还原 scanState 并执行钩子函数。
    //执行并移除所有本地任务
    final void execLocalTasks() {
        int b = base, m, s;
        ForkJoinTask<?>[] a = array;
        if (b - (s = top - 1) <= 0 && a != null &&
                (m = a.length - 1) >= 0) {
            if ((config & FIFO_QUEUE) == 0) {//FIFO模式
                for (ForkJoinTask<?> t; ; ) {
                    if ((t = (ForkJoinTask<?>) U.getAndSetObject
                            (a, ((m & s) << ASHIFT) + ABASE, null)) == null)//FIFO执行,取top任务
                        break;
                    U.putOrderedInt(this, QTOP, s);
                    t.doExec();//执行
                    if (base - (s = top - 1) > 0)
                        break;
                }
            } else
                pollAndExecAll();//LIFO模式执行,取base任务
        }
    }
    

    deregisterWorker 方法

    • deregisterWorker 方法用于工作线程运行完毕之后终止线程或处理工作线程异常,主要就是清除已关闭的工作线程或回滚创建线程之前的操作,并把传入的异常抛给
      ForkJoinTask 来处理。
    final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
        WorkQueue w = null;
        //1.移除workQueue
        if (wt != null && (w = wt.workQueue) != null) {//获取ForkJoinWorkerThread的等待队列
            WorkQueue[] ws;                           // remove index from array
            int idx = w.config & SMASK;//计算workQueue索引
            int rs = lockRunState();//获取runState锁和当前池运行状态
            if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
                ws[idx] = null;//移除workQueue
            unlockRunState(rs, rs & ~RSLOCK);//解除runState锁
        }
        //2.减少CTL数
        long c;                                       // decrement counts
        do {} while (!U.compareAndSwapLong
                     (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                           (TC_MASK & (c - TC_UNIT)) |
                                           (SP_MASK & c))));
        //3.处理被移除workQueue内部相关参数
        if (w != null) {
            w.qlock = -1;                             // ensure set
            w.transferStealCount(this);
            w.cancelAll();                            // cancel remaining tasks
        }
        //4.如果线程未终止,替换被移除的workQueue并唤醒内部线程
        for (;;) {                                    // possibly replace
            WorkQueue[] ws; int m, sp;
            //尝试终止线程池
            if (tryTerminate(false, false) || w == null || w.array == null ||
                (runState & STOP) != 0 || (ws = workQueues) == null ||
                (m = ws.length - 1) < 0)              // already terminating
                break;
            //唤醒被替换的线程,依赖于下一步
            if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
                if (tryRelease(c, ws[sp & m], AC_UNIT))
                    break;
            }
            //创建工作线程替换
            else if (ex != null && (c & ADD_WORKER) != 0L) {
                tryAddWorker(c);                      // create replacement
                break;
            }
            else                                      // don't need replacement
                break;
        }
        //5.处理异常
        if (ex == null)                               // help clean on way out
            ForkJoinTask.helpExpungeStaleExceptions();
        else                                          // rethrow
            ForkJoinTask.rethrow(ex);
    }
    

    3.2.4 获取任务结果过程

    • join() 方法一把是在任务 fork() 之后调用,用来获取(或者叫“合并”)任务的执行结果。
    • ForkJoinTask 的 join()invoke() 方法都可以用来获取任务的执行结果(另外还有 get() 方法也是调用了 doJoin() 来获取任务结果,但是会响应运行时异常),它们对外部提交任务的执行方式一致,都是通过 externalAwaitDone() 方法等待执行结果。
      • invoke() 方法会直接执行当前任务。
      • join() 方法则是在当前任务在队列 top 位时(通过 tryUnpush() 方法判断)才能执行,如果当前任务不在 top 位或者任务执行失败调用 ForkJoinPool.awaitJoin() 方法帮助执行或阻塞当前 Join 任务。

    join 方法

    //合并任务结果
    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
     
    //join, get, quietlyJoin的主实现方法
    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }
    

    invoke 方法

    //执行任务,并等待任务完成并返回结果
    public final V invoke() {
        int s;
        if ((s = doInvoke() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
     
    //invoke, quietlyInvoke的主实现方法
    private int doInvoke() {
        int s; Thread t; ForkJoinWorkerThread wt;
        return (s = doExec()) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (wt = (ForkJoinWorkerThread)t).pool.
            awaitJoin(wt.workQueue, this, 0L) :
            externalAwaitDone();
    }
    
    • 官方文档中建议对 ForkJoinTask 任务的调用顺序。
      • 一对 fork-join 操作一般按照如下顺序调用。
    a.fork(); 
    b.fork(); 
    b.join(); 
    a.join();
    
    • 因为任务 b 是后面进入队列,也就是说它是在栈顶的(top 位),在它 fork() 之后直接调用 join() 就可以直接执行而不会调用 ForkJoinPool.awaitJoin 方法去等待。
    join 执行流程

    externalAwaitDone 方法

    • 如果当前 join 为外部调用,则调用此方法执行任务,如果任务执行失败就进入等待。
    • 对不同的任务类型分两种情况。
      • 如果任务为 CountedCompleter 类型的任务,则调用 externalHelpComplete() 方法来执行任务。
      • 其他类型的 ForkJoinTask 任务调用 tryExternalUnpush 来执行。
    private int externalAwaitDone() {
        //执行任务
        int s = ((this instanceof CountedCompleter) ? // try helping
                 ForkJoinPool.common.externalHelpComplete(  // CountedCompleter任务
                     (CountedCompleter<?>)this, 0) :
                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);  // ForkJoinTask任务
        if (s >= 0 && (s = status) >= 0) {//执行失败,进入等待
            boolean interrupted = false;
            do {
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {  //更新state
                    synchronized (this) {
                        if (status >= 0) {//SIGNAL 等待信号
                            try {
                                wait(0L);
                            } catch (InterruptedException ie) {
                                interrupted = true;
                            }
                        }
                        else
                            notifyAll();
                    }
                }
            } while ((s = status) >= 0);
            if (interrupted)
                Thread.currentThread().interrupt();
        }
        return s;
    }
    

    tryExternalUnpush 方法

    • tryExternalUnpush 的作用就是判断当前任务是否在 top 位,如果是则弹出任务,然后在 externalAwaitDone 中调用 doExec() 执行任务。
    //为外部提交者提供 tryUnpush 功能(给定任务在top位时弹出任务)
    final boolean tryExternalUnpush(ForkJoinTask<?> task) {
        WorkQueue[] ws;
        WorkQueue w;
        ForkJoinTask<?>[] a;
        int m, s;
        int r = ThreadLocalRandom.getProbe();
        if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
                (w = ws[m & r & SQMASK]) != null &&
                (a = w.array) != null && (s = w.top) != w.base) {
            long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;  //取top位任务
            if (U.compareAndSwapInt(w, QLOCK, 0, 1)) {  //加锁
                if (w.top == s && w.array == a &&
                        U.getObject(a, j) == task &&
                        U.compareAndSwapObject(a, j, task, null)) {  //符合条件,弹出
                    U.putOrderedInt(w, QTOP, s - 1);  //更新top
                    U.putOrderedInt(w, QLOCK, 0); //解锁,返回true
                    return true;
                }
                U.compareAndSwapInt(w, QLOCK, 1, 0);  //当前任务不在top位,解锁返回false
            }
        }
        return false;
    }
    

    awaitJoin 方法

    final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
        int s = 0;
        if (task != null && w != null) {
            ForkJoinTask<?> prevJoin = w.currentJoin;  //获取给定Worker的join任务
            U.putOrderedObject(w, QCURRENTJOIN, task);  //把currentJoin替换为给定任务
            //判断是否为CountedCompleter类型的任务
            CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                    (CountedCompleter<?>) task : null;
            for (; ; ) {
                if ((s = task.status) < 0)  //已经完成|取消|异常 跳出循环
                    break;
     
                if (cc != null)//CountedCompleter任务由helpComplete来完成join
                    helpComplete(w, cc, 0);
                else if (w.base == w.top || w.tryRemoveAndExec(task))  //尝试执行
                    helpStealer(w, task);  //队列为空或执行失败,任务可能被偷,帮助偷取者执行该任务
     
                if ((s = task.status) < 0) //已经完成|取消|异常,跳出循环
                    break;
                //计算任务等待时间
                long ms, ns;
                if (deadline == 0L)
                    ms = 0L;
                else if ((ns = deadline - System.nanoTime()) <= 0L)
                    break;
                else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                    ms = 1L;
     
                if (tryCompensate(w)) {//执行补偿操作
                    task.internalWait(ms);//补偿执行成功,任务等待指定时间
                    U.getAndAddLong(this, CTL, AC_UNIT);//更新活跃线程数
                }
            }
            U.putOrderedObject(w, QCURRENTJOIN, prevJoin);//循环结束,替换为原来的join任务
        }
        return s;
    }
    
    • 如果当前 join 任务不在 Worker 等待队列的 top 位,或者任务执行失败,调用此方法来帮助执行或阻塞当前 join 的任务。函数执行流程如下。
      • 由于每次调用 awaitJoin 都会优先执行当前 join 的任务,所以首先会更新 currentJoin 为当前 join 任务。
      • 进入自旋。
        1. 首先检查任务是否已经完成(通过 task.status < 0 判断),如果给定任务执行完毕|取消|异常 则跳出循环返回执行状态 s。
        2. 如果是 CountedCompleter 任务类型,调用 helpComplete() 方法来完成 join 操作。
        3. 非 CountedCompleter 任务类型调用 WorkQueue.tryRemoveAndExec() 尝试执行任务。
        4. 如果给定 WorkQueue 的等待队列为空或任务执行失败,说明任务可能被偷,调用 helpStealer() 帮助偷取者执行任务(也就是说,偷取者帮我执行任务,我去帮偷取者执行它的任务)。
        5. 再次判断任务是否执行完毕(task.status < 0),如果任务执行失败,计算一个等待时间准备进行补偿操作。
        6. 调用 tryCompensate() 方法为给定 WorkQueue 尝试执行补偿操作。在执行补偿期间,如果发现 资源争用|池处于 unstable 状态|当前 Worker 已终止,则调用 ForkJoinTask.internalWait() 方法等待指定的时间,任务唤醒之后继续自旋。

    internalWait 方法

    final void internalWait(long timeout) {
        int s;
        if ((s = status) >= 0 && // force completer to issue notify
            U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {//更新任务状态为SIGNAL(等待唤醒)
            synchronized (this) {
                if (status >= 0)
                    try { wait(timeout); } catch (InterruptedException ie) { }
                else
                    notifyAll();
            }
        }
    }
    

    tryRemoveAndExec 方法

    • 从 top 位开始自旋向下找到给定任务,如果找到把它从当前 Worker 的任务队列中移除并执行它。
      • 如果任务队列为空或者任务未执行完毕返回 true。
      • 任务执行完毕返回 false。
    final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
        ForkJoinTask<?>[] a;
        int m, s, b, n;
        if ((a = array) != null && (m = a.length - 1) >= 0 &&
                task != null) {
            while ((n = (s = top) - (b = base)) > 0) {
                //从top往下自旋查找
                for (ForkJoinTask<?> t; ; ) {      // traverse from s to b
                    long j = ((--s & m) << ASHIFT) + ABASE;//计算任务索引
                    if ((t = (ForkJoinTask<?>) U.getObject(a, j)) == null) //获取索引到的任务
                        return s + 1 == top;     // shorter than expected
                    else if (t == task) { //给定任务为索引任务
                        boolean removed = false;
                        if (s + 1 == top) {      // pop
                            if (U.compareAndSwapObject(a, j, task, null)) { //弹出任务
                                U.putOrderedInt(this, QTOP, s); //更新top
                                removed = true;
                            }
                        } else if (base == b)      // replace with proxy
                            removed = U.compareAndSwapObject(
                                    a, j, task, new EmptyTask()); //join任务已经被移除,替换为一个占位任务
                        if (removed)
                            task.doExec(); //执行
                        break;
                    } else if (t.status < 0 && s + 1 == top) { //给定任务不是top任务
                        if (U.compareAndSwapObject(a, j, t, null)) //弹出任务
                            U.putOrderedInt(this, QTOP, s);//更新top
                        break;                  // was cancelled
                    }
                    if (--n == 0) //遍历结束
                        return false;
                }
                if (task.status < 0) //任务执行完毕
                    return false;
            }
        }
        return true;
    }
    

    helpStealer 方法

    private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
        WorkQueue[] ws = workQueues;
        int oldSum = 0, checkSum, m;
        if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
                task != null) {
            do {                                       // restart point
                checkSum = 0;                          // for stability check
                ForkJoinTask<?> subtask;
                WorkQueue j = w, v;                    // v is subtask stealer
                descent:
                for (subtask = task; subtask.status >= 0; ) {
                    //1. 找到给定WorkQueue的偷取者v
                    for (int h = j.hint | 1, k = 0, i; ; k += 2) {//跳两个索引,因为Worker在奇数索引位
                        if (k > m)                     // can't find stealer
                            break descent;
                        if ((v = ws[i = (h + k) & m]) != null) {
                            if (v.currentSteal == subtask) {//定位到偷取者
                                j.hint = i;//更新stealer索引
                                break;
                            }
                            checkSum += v.base;
                        }
                    }
                    //2. 帮助偷取者v执行任务
                    for (; ; ) {                         // help v or descend
                        ForkJoinTask<?>[] a;            //偷取者内部的任务
                        int b;
                        checkSum += (b = v.base);
                        ForkJoinTask<?> next = v.currentJoin;//获取偷取者的join任务
                        if (subtask.status < 0 || j.currentJoin != subtask ||
                                v.currentSteal != subtask) // stale
                            break descent; // stale,跳出descent循环重来
                        if (b - v.top >= 0 || (a = v.array) == null) {
                            if ((subtask = next) == null)   //偷取者的join任务为null,跳出descent循环
                                break descent;
                            j = v;
                            break; //偷取者内部任务为空,可能任务也被偷走了;跳出本次循环,查找偷取者的偷取者
                        }
                        int i = (((a.length - 1) & b) << ASHIFT) + ABASE;//获取base偏移地址
                        ForkJoinTask<?> t = ((ForkJoinTask<?>)
                                U.getObjectVolatile(a, i));//获取偷取者的base任务
                        if (v.base == b) {
                            if (t == null)             // stale
                                break descent; // stale,跳出descent循环重来
                            if (U.compareAndSwapObject(a, i, t, null)) {//弹出任务
                                v.base = b + 1;         //更新偷取者的base位
                                ForkJoinTask<?> ps = w.currentSteal;//获取调用者偷来的任务
                                int top = w.top;
                                //首先更新给定workQueue的currentSteal为偷取者的base任务,然后执行该任务
                                //然后通过检查top来判断给定workQueue是否有自己的任务,如果有,
                                // 则依次弹出任务(LIFO)->更新currentSteal->执行该任务(注意这里是自己偷自己的任务执行)
                                do {
                                    U.putOrderedObject(w, QCURRENTSTEAL, t);
                                    t.doExec();        // clear local tasks too
                                } while (task.status >= 0 &&
                                        w.top != top && //内部有自己的任务,依次弹出执行
                                        (t = w.pop()) != null);
                                U.putOrderedObject(w, QCURRENTSTEAL, ps);//还原给定workQueue的currentSteal
                                if (w.base != w.top)//给定workQueue有自己的任务了,帮助结束,返回
                                    return;            // can't further help
                            }
                        }
                    }
                }
            } while (task.status >= 0 && oldSum != (oldSum = checkSum));
        }
    }
    
    • 如果队列为空或任务执行失败,说明任务可能被偷,调用此方法来帮助偷取者执行任务。基本思想是:偷取者帮助我执行任务,我去帮助偷取者执行它的任务。执行流程如下。
      1. 循环定位偷取者,由于 Worker 是在奇数索引位,所以每次会跳两个索引位。定位到偷取者之后,更新调用者 WorkQueue 的 hint 为偷取者的索引,方便下次定位。
      2. 定位到偷取者后,开始帮助偷取者执行任务。从偷取者的 base 索引开始,每次偷取一个任务执行。在帮助偷取者执行任务后,如果调用者发现本身已经有任务 w.top != top,则依次弹出自己的任务(LIFO 顺序)并执行(也就是说自己偷自己的任务执行)。

    tryCompensate 方法

    //执行补偿操作:尝试缩减活动线程量,可能释放或创建一个补偿线程来准备阻塞
    private boolean tryCompensate(WorkQueue w) {
        boolean canBlock;
        WorkQueue[] ws;
        long c;
        int m, pc, sp;
        if (w == null || w.qlock < 0 ||           // caller terminating
                (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
                (pc = config & SMASK) == 0)           // parallelism disabled
            canBlock = false; //调用者已终止
        else if ((sp = (int) (c = ctl)) != 0)      // release idle worker
            canBlock = tryRelease(c, ws[sp & m], 0L);//唤醒等待的工作线程
        else {//没有空闲线程
            int ac = (int) (c >> AC_SHIFT) + pc; //活跃线程数
            int tc = (short) (c >> TC_SHIFT) + pc;//总线程数
            int nbusy = 0;                        // validate saturation
            for (int i = 0; i <= m; ++i) {        // two passes of odd indices
                WorkQueue v;
                if ((v = ws[((i << 1) | 1) & m]) != null) {//取奇数索引位
                    if ((v.scanState & SCANNING) != 0)//没有正在运行任务,跳出
                        break;
                    ++nbusy;//正在运行任务,添加标记
                }
            }
            if (nbusy != (tc << 1) || ctl != c)
                canBlock = false;                 // unstable or stale
            else if (tc >= pc && ac > 1 && w.isEmpty()) {//总线程数大于并行度 && 活动线程数大于1 && 调用者任务队列为空,不需要补偿
                long nc = ((AC_MASK & (c - AC_UNIT)) |
                        (~AC_MASK & c));       // uncompensated
                canBlock = U.compareAndSwapLong(this, CTL, c, nc);//更新活跃线程数
            } else if (tc >= MAX_CAP ||
                    (this == common && tc >= pc + commonMaxSpares))//超出最大线程数
                throw new RejectedExecutionException(
                        "Thread limit exceeded replacing blocked worker");
            else {                                // similar to tryAddWorker
                boolean add = false;
                int rs;      // CAS within lock
                long nc = ((AC_MASK & c) |
                        (TC_MASK & (c + TC_UNIT)));//计算总线程数
                if (((rs = lockRunState()) & STOP) == 0)
                    add = U.compareAndSwapLong(this, CTL, c, nc);//更新总线程数
                unlockRunState(rs, rs & ~RSLOCK);
                //运行到这里说明活跃工作线程数不足,需要创建一个新的工作线程来补偿
                canBlock = add && createWorker(); // throws on exception
            }
        }
        return canBlock;
    }
    
    • 需要和不需要补偿的几种情况。
      • 需要补偿。
        • 调用者队列不为空,并且有空闲工作线程,这种情况会唤醒空闲线程(调用 tryRelease() 方法)。
        • 池尚未停止,活跃线程数不足,这时会新建一个工作线程(调用 createWorker() 方法)。
      • 不需要补偿。
        • 调用者已终止或池处于不稳定状态。
        • 总线程数大于并行度并且活动线程数大于 1 并且调用者任务队列为空。

    4. Fork/Join 的注意事项

    避免不必要的 fork

    • 划分成两个子任务后,不要同时调用两个子任务的 fork() 方法。
      • 直接调用 compute() 效率更高。
      • 直接调用子任务的 compute() 方法实际上就是在当前的工作线程进行了计算(线程重用),这比 " 将子任务提交到工作队列,线程又从工作队列中拿任务 " 快得多。
    任务提交
    • 当一个大任务被划分成两个以上的子任务时,尽可能使用 invokeAll() 方法,因为使用它们能避免不必要的 fork()

    注意 fork、compute、join 的顺序

    right.fork(); // 计算右边的任务
    long leftAns = left.compute(); // 计算左边的任务(同时右边任务也在计算)
    long rightAns = right.join(); // 等待右边的结果
    return leftAns + rightAns;
    
    • 下面两种实际上都没有并行。
    left.fork(); // 计算完左边的任务
    long leftAns = left.join(); // 等待左边的计算结果
    long rightAns = right.compute(); // 再计算右边的任务
    return leftAns + rightAns;
    
    long rightAns = right.compute(); // 计算完右边的任务
    left.fork(); // 再计算左边的任务
    long leftAns = left.join(); // 等待左边的计算结果
    return leftAns + rightAns;
    

    选择合适的子任务粒度

    • 选择划分子任务的粒度(顺序执行的阈值)很重要,因为使用 Fork/Join 框架并不一定比顺序执行任务的效率高。
      • 如果任务太大,则无法提高并行的吞吐量。
      • 如果任务太小,子任务的调度开销可能会大于并行计算的性能提升,还需要考虑创建子任务、fork() 子任务、线程调度以及合并子任务处理结果的耗时以及相应的内存消耗。
    • 官方文档 给出的粗略经验是:任务应该执行100~10000个基本的计算步骤。决定子任务的粒度的最好办法是实践,通过实际测试结果来确定这个阈值才是 " 上上策 "。
    • 和其他 Java 代码一样,Fork/Join 框架测试时需要 " 预热 " 或者说执行几遍才会被 JIT(Just-in-time)编译器 优化,所以测试性能之前需要先跑几遍程序。

    避免重量级任务划分与结果合并

    • Fork/Join 的很多使用场景都用到数组或者 List 等数据结构,子任务在某个分区中运行,最典型的例子如并行排序和并行查找。
      • 拆分子任务以及合并处理结果的时候,应该尽量避免 System.arraycopy 这样耗时耗空间的操作,从而最小化任务的处理开销。

    5 Fork/Join 的异常处理

    • 在 ForkJoinTask 的 invoke()join() 方法及其衍生方法中都没有像 get() 方法那样抛出 ExecutionException 的受检异常。
      • ForkJoinTask 中把受检异常转换成了运行时异常。
    static void rethrow(Throwable ex) {
        if (ex != null)
            ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
    }
    
    @SuppressWarnings("unchecked")
    static <T extends Throwable> void uncheckedThrow(Throwable t) throws T {
        throw (T)t; // rely on vacuous cast
    }
    
    • JVM 实际并不关心这个异常是受检异常还是运行时异常,受检异常这东西完全是给 Java 编译器用的,用于警告程序员这里有个异常没有处理。
    • invoke()join() 仍可能会抛出运行时异常,所以 ForkJoinTask 还提供了两个不提取结果和异常的方法 quietlyInvoke()quietlyJoin(),这两个方法允许在所有任务完成后对结果和异常进行处理。
      • 使用 quitelyInvoke()quietlyJoin() 时可以配合 isCompletedAbnormally()isCompletedNormally() 方法使用。

    参考资料

    https://www.cnblogs.com/cjsblog/p/9078341.html
    http://www.importnew.com/27334.html
    https://blog.csdn.net/abc123lzf/article/details/82873181
    http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/#%E5%8E%9F%E7%90%86
    https://blog.csdn.net/Holmofy/article/details/82714665
    https://blog.csdn.net/yongchao940/article/details/83027082

    相关文章

      网友评论

        本文标题:【Java 并发笔记】Fork/Join 框架相关整理(下)

        本文链接:https://www.haomeiwen.com/subject/zizidqtx.html