ForkjoinPool -3

作者: 程序员札记 | 来源:发表于2022-03-17 13:03 被阅读0次

    任务分割与等待:fork和join

    fork和join是ForkJoinTask的方法,也是整个框架的设计灵魂:fork把任务切分为小任务,join则等待任务结果。

    fork

    ForkJoinTask fork的实现异常简单:

        public final ForkJoinTask<V> fork() {
            Thread t;
            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
             // 放到当前线程的workqueue
                ((ForkJoinWorkerThread)t).workQueue.push(this);
            else
                ForkJoinPool.common.externalPush(this);
            return this;
        }
    

    common 是一个静态的ForkjoinPool,不可关闭。即使你不new 一个ForkjoinPool ,直接调用fork ,此任务就直接交给common 了。 可以这么理解说,ForkjoinPool 有一个default , common 的构造在Forkjoinpool的static 代码段中。

        static {
            // initialize field offsets for CAS etc
            try {
    ...
    
            common = java.security.AccessController.doPrivileged
                (new java.security.PrivilegedAction<ForkJoinPool>() {
                    public ForkJoinPool run() { return makeCommonPool(); }});
            int par = common.config & SMASK; // report 1 even if threads disabled
            commonParallelism = par > 0 ? par : 1;
        }
    
    
    

    join

        public final V join() {
            int s;
       
            if ((s = doJoin() & DONE_MASK) != NORMAL)
                reportException(s);
            return getRawResult();
        }
    
    

    doJoin 是一个等待任务执行结束的方法,当任务执行完毕的时候就会返回。这里虽然等待,确不一定返回。

        private int doJoin() {
            int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
            return (s = status) < 0 ? s :
                ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                tryUnpush(this) && (s = doExec()) < 0 ? s :
                wt.pool.awaitJoin(w, this, 0L) :
                externalAwaitDone();
        }
    
    

    fork方法逻辑:

    • 判断当前线程是否为池中的工作线程类型
      是:将当前任务压入当前线程的任务队列中
      不是:将当前任务压入common池中某个工作线程的任务队列中
    • 返回当前的ForkJoinTask任务对象,方便递归拆分

    doJoin&join方法逻辑:

    • 判断任务状态status是否小于0:
      小于:代表任务已经结束,返回status值
      不小于:判断当前线程是否为池中的工作线程:
      是:尝试从栈顶/队尾取出当前task执行:
      任务在栈顶:执行任务并返回执行结束的status值
      不在栈顶:调用awaitJoin方法等待执行结束
      不是:调用externalAwaitDone()方法阻塞挂起当前线程,等待任务执行结束
    • 判断任务执行状态是否为非正常结束状态,是则抛出异常堆栈信息
      任务状态为被取消,抛出CancellationException异常
      任务状态为异常结束,抛出对应的执行异常信息
    • 如果status为正常结束状态,则直接返回执行结果

    state 状态

        volatile int status; // accessed directly by pool and workers
        static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
        static final int NORMAL      = 0xf0000000;  // must be negative
        static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
        static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
        static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
        static final int SMASK       = 0x0000ffff;  // short bits for tags
    

    所以status <0 代表任务已经完成,取消或者异常,总之就是任务结束了。
    上面这段代码的意思就是当前任务是否结束,没有就看是不是ForkJoinWorkerThread,

    • 如果不是就externalAwaitDone。
    • 如果是 就尝试出队并执行,如果不能出队或者无法执行,则调用wt.pool.awaitJoin(w, this, 0L)等待

    tryUnpush

    工作线程在合并结果时,如果这个任务被fork到了栈顶/队尾,那么执行该任务返回即可。但如果不在栈顶,有可能是被其他fork出的任务压下去了或者其他线程被窃取了,那么则会进入awaitJoin()方法。
    externalAwaitDone 基本上是个wait notify 方法

    
        private int externalAwaitDone() {
            int s = ((this instanceof CountedCompleter) ? // try helping
                     ForkJoinPool.common.externalHelpComplete(
                         (CountedCompleter<?>)this, 0) :
                     ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
            if (s >= 0 && (s = status) >= 0) {
                boolean interrupted = false;
                do {
                    if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                        synchronized (this) {
                            if (status >= 0) {
                                try {
                                    wait(0L);
                                } catch (InterruptedException ie) {
                                    interrupted = true;
                                }
                            }
                            else
                                notifyAll();
                        }
                    }
                } while ((s = status) >= 0);
                if (interrupted)
                    Thread.currentThread().interrupt();
            }
            return s;
        }
    

    awaitJoin

    awaitJoin就相当复杂,要帮助当然执行worker 尽快偷到一个任务来执行。

        final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
            int s = 0;
            if (task != null && w != null) {
             //把currentJoin 仅仅在这个方法内使用,currentJoin赋给 prevJoin 说明task 不能是循环依赖的,必须是个有向无环图。
            //即使有循环依赖,在下面的程序里也有办法解决,就是设置超时时间。所以wait 和internalWait 设置了超时时间。
                ForkJoinTask<?> prevJoin = w.currentJoin;
                U.putOrderedObject(w, QCURRENTJOIN, task);
                CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                    (CountedCompleter<?>)task : null;
                for (;;) {
                    // 如果task 完成
                    if ((s = task.status) < 0)
                        break;
                    if (cc != null)
                        helpComplete(w, cc, 0);
                    //如果当前没有任务,现在task 未完成,则task 被窃取了
                    else if (w.base == w.top || w.tryRemoveAndExec(task))
                    //帮助stealer执行任务,毕竟当前自身队列已经执行完了,所以帮助stealer 执行任务
                    // 以尽快返回
                        helpStealer(w, task);
                    if ((s = task.status) < 0)
                        break;
                    long ms, ns;
                    if (deadline == 0L)
                        ms = 0L;
                    else if ((ns = deadline - System.nanoTime()) <= 0L)
                        break;
                    else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                        ms = 1L;
                   //尝试增加一个线程作为补偿,因为当前线程准备进入等待,能到达这里表示任务完成,并且队列里还有任务
                    if (tryCompensate(w)) {
                        task.internalWait(ms);
                        U.getAndAddLong(this, CTL, AC_UNIT);
                    }
                }
                U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
            }
            return s;
        }
    

    awaitJoin方法的总体逻辑还算简单,如下:

    • 检查当前线程的工作队列是否为空
      为空:代表任务被窃取了
      不为空:通过tryRemoveAndExec在整个队列中查找当前需要join的任务
      找到了:执行任务
      没找到:代表任务还是被偷了(这种情况下不参与helpStealer方法)
      -如果任务被偷了,那么通过helpStealer找到窃取者,帮助它执行任务
      -如果从helpStealer方法中退出,会再检查一次任务是否已完成:
      已执行结束:退出循环,出去合并结果
      未执行结束:准备进入阻塞,避免CPU资源浪费
      -在进入阻塞之前,会先对线程池进行补偿,因为当前线程可能是线程池中的最后一个活跃线程,为了避免线程池所有线程都“死”掉,会先为线程池补偿一个活跃线程

    tryRemoveAndExec

    tryRemoveAndExec从队列中取出task执行,或者已经取消的task

    
            final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
                ForkJoinTask<?>[] a; int m, s, b, n;
                if ((a = array) != null && (m = a.length - 1) >= 0 &&
                    task != null) {
                    while ((n = (s = top) - (b = base)) > 0) {
                        for (ForkJoinTask<?> t;;) {      // traverse from s to b
                           //从top的第一位置,开始遍历
                            long j = ((--s & m) << ASHIFT) + ABASE;
                         //获取到的top 为空, 并且s+1 = top, 则当然队列为空,否则非空,如果队列非空,说明所有的元素都遍历了
                            if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                                return s + 1 == top;     // shorter than expected
                            else if (t == task) {
                                boolean removed = false;
                                if (s + 1 == top) {      // pop
                                    if (U.compareAndSwapObject(a, j, task, null)) {
                                        U.putOrderedInt(this, QTOP, s);
                                        removed = true;
                                    }
                                }
    //如果到这里,说明t 在base 和base-2 之间, base=b 表示没有任何task被偷
    //如果可能被偷,则很坑就是t, +1 判断
                                else if (base == b)      // replace with proxy
                                    removed = U.compareAndSwapObject(
                                        a, j, task, new EmptyTask());
                                if (removed)
                                    task.doExec();
                                break;
                            }
                            else if (t.status < 0 && s + 1 == top) {
                                if (U.compareAndSwapObject(a, j, t, null))
                                    U.putOrderedInt(this, QTOP, s);
                                break;                  // was cancelled
                            }
                            if (--n == 0)
                                return false;
                        }
                        if (task.status < 0)
                            return false;
                    }
                }
                return true;
            }
    
    
    

    tryRemoveAndExec方法的逻辑,如下:
    判断队列中是否有任务:

    • 不存在:返回true,外部的awaitJoin方法进入helpStealer逻辑
      • 存在:判断任务是否在队列尾部/栈底:
        • 在:尝试CAS弹出栈顶任务:
          • 成功:执行任务
          • 失败:代表CAS失败,任务被别的线程偷走了,进入helpStealer逻辑
        • 不在:可能被其他任务压下去了,从栈顶开始查找整个队列:
          • 找到了:将任务替换成EmptyTask对象,执行任务
          • 没找到:代表任务被偷了,但虽然没找到,也不参与helpStealer了,不过在退出之前会再一次检测任务是否执行完成

    tryRemoveAndExec方法比较简单,该方法主要作用是遍历当前线程的WorkQueue,在队列中查找要join合并的任务执行。而在执行过程中,如果队列为空或者任务在栈顶但cas失败以及遍历完整个队列都没找到要join的任务,这三种情况代表任务被偷了,对于前两种情况下,会进入helpStealer帮助窃取者执行任务,而对于最后一种被窃取任务的情况,则会直接退出阻塞(个人猜测:可能是因为遍历完整个队列会导致一段时间的开销,被窃取走的任务很有可能在这段时间内已经执行完了或快执行完了。所以与其去帮助窃取者执行任务,还不如阻塞等待一会儿)。

    helpStealer

    在看Forkjoinpool 的helpStealer方法

    private void helpStealer(WorkQueue w, ForkJoinTask<?> task) { // 帮助偷取者(偷取自己任务的线程)执行任务
            WorkQueue[] ws = workQueues;
            int oldSum = 0, checkSum, m;
            if (ws != null && (m = ws.length - 1) >= 0 && w != null && task != null) {
                do { // restart point
                    checkSum = 0; // for stability check
                    ForkJoinTask<?> subtask;
                    WorkQueue j = w, v; // v是子任务的偷取者
                    descent: for (subtask = task; subtask.status >= 0;) { // 从目标任务开始,记录当前Join的任务,也包括偷取者当前Join的任务,递归帮助
                        for (int h = j.hint | 1, k = 0, i;; k += 2) { // 每次跳2个,遍历奇数位索引
                            if (k > m) // 如果遍历一遍没有找到偷取者,跳出循环
                                break descent;
                            if ((v = ws[i = (h + k) & m]) != null) {
                                if (v.currentSteal == subtask) { // 定位到偷取者,更新hint为偷取者索引,方便下次定位
                                    j.hint = i;
                                    break;
                                }
                                checkSum += v.base; // 若没有定位到,则累加工作队列的base值,继续遍历
                            }
                        }
                        for (;;) { // 帮助偷取者执行任务
                            ForkJoinTask<?>[] a; // 偷取者的任务数组
                            int b;
                            checkSum += (b = v.base); // 累加偷取者的base值
                            ForkJoinTask<?> next = v.currentJoin; // 记录偷取者Join的任务
                            if (subtask.status < 0 || j.currentJoin != subtask || v.currentSteal != subtask) // subtask结束,或者数据不一致了(j.currentJoin != subtask || v.currentSteal != subtask)
                                break descent; // 跳出外层循环重来
                            if (b - v.top >= 0 || (a = v.array) == null) { // 偷取者的任务列表为空
                                if ((subtask = next) == null) // 偷取者的Join任务为空,跳出外层循环
                                    break descent;
                                j = v; // 否则,j取v的值(j指向被偷者,v指向偷取者),且subtask指向next Join任务
                                break; // 继续遍历,寻找偷取者的偷取者(递归)
                            }
                            int i = (((a.length - 1) & b) << ASHIFT) + ABASE; // 偷取者的base内存地址
                            ForkJoinTask<?> t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i)); // 获取base位置任务
                            if (v.base == b) {
                                if (t == null) // 任务为空,跳出外层循环(可能被别的线程拿走了)
                                    break descent;
                                if (U.compareAndSwapObject(a, i, t, null)) { // poll(从base位置取出任务)
                                    v.base = b + 1; // 更新base的值
                                    ForkJoinTask<?> ps = w.currentSteal; // 记录调用者之前偷取的任务
                                    int top = w.top; // 记录调用者的top值
                                    do {
                                        U.putOrderedObject(w, QCURRENTSTEAL, t); // 更新currentSteal为刚刚偷取到的任务
                                        t.doExec(); // 指向任务
                                    } while (task.status >= 0 && w.top != top && (t = w.pop()) != null); // 如果任务未结束,且自己任务队列不为空,优先处理自己队列里的任务
                                    U.putOrderedObject(w, QCURRENTSTEAL, ps); // 把之前偷取的任务设置回currentSteal
                                    if (w.base != w.top) // 自己队列来新任务了,直接返回
                                        return;
                                }
                            }
                        }
                    }
                } while (task.status >= 0 && oldSum != (oldSum = checkSum)); // Join的任务未结束,且任务在流动中,继续帮助执行
            }
        }
    
    
    • 每次跳2个槽位,遍历奇数位索引,直到定位到偷取者,并记录偷取者的索引(hint = i),方便下次定位。
      获取偷取者的任务列表,帮助其执行任务,如果执行过程中发现自己任务列表里有任务,则依次弹出执行。
    • 如果偷取者任务队列为空,则帮助其执行Join任务,寻找偷取者的偷取者,如此往复,加快任务执行。
    • 如果最后发现自己任务队列不为空(base != top),则退出帮助。
    • 最后判断任务task是否结束,如果未结束,且工作队列base和在变动中,说明偷取任务一直在进行,则重复以上操作,加快任务执行

    }
    该方法是ForkJoin框架实现“工作窃取思想”的核心体现。它与scan扫描方法完成了整个框架“工作窃取”实现。在scan方法之后的runTask方法中,会对currentSteal赋值,而helpStealer方法就是依赖于该成员与currentJoin成员形成的一条窃取链,实现了帮助窃取者执行任务。总而言之,helpStealer方法的核心思想是帮助执行,帮助窃取者执行它的任务,但它不仅仅只会帮助窃取者执行,还会基于currentSteal与currentJoin成员形成的窃取链帮助窃取者的窃取者执行、帮助窃取者的窃取者的窃取者执行、帮助窃取者.....的窃取者执行任务。上个例子理解,如下:

    • 线程T1需要join合并任务TaskA,但是TaskA被偷了,开始遍历查找所有奇数队列
      -查找后发现TaskA==线程T2.currentSteal成员,此时T2为T1的窃取者

    • T1从T2的队列栈底窃取一个任务执行,执行完再窃取一个执行,继续窃取....

    • T1发现T2的队列中没有了任务,T1则会继续寻找窃取了T2.currentlJoin的线程

    • 经过遍历发现T2.currentlJoin==T5.currentSteal,T5为T2的窃取者

    • 然后T1继续从T5队列的栈底窃取一个任务执行,完成后继续窃取.....
      -T1发现T5的队列中也没有了任务,T1会继续寻找窃取了T5.currentlJoin的....
      -根据窃取链,一直这样循环下去.....
      通过如上过程可发现:T1.currentlJoin → T2.currentSteal → T2.currentlJoin → T5.currentSteal → T5.currentlJoin....,通过currentSteal与currentJoin两个成员构成了一条窃取链,如果理解了这条链路关系,那么也就理解了helpStealer方法。不过值得注意的是:helpStealer方法什么时候退出呢?答案是:窃取链断掉的时候会退出。总共有三种情况会导致窃取链断掉:

    • 任何一个工作线程的currentSteal或currentJoin为空

    • 任何一个工作线程的currentSteal或currentJoin已经执行完成
      ③当前线程的join任务已经执行完成
      其实说到底,helpStealer方法是ForkJoin框架的一个优化性能的实现点,核心点在于减少线程因为合并而阻塞,在等待join任务执行期间帮其它线程执行一个任务,这样则保证了每个线程不停止工作,也能够加快整体框架的处理速度,同时在帮助执行的期间,被窃取的join任务就执行完了

    tryCompensate补偿活跃线程方法

    再来看看为线程池补偿活跃线程的tryCompensate方法:

    // ForkJoinPool类 → tryCompensate()方法
    private boolean tryCompensate(WorkQueue w) {
        boolean canBlock;
        WorkQueue[] ws; long c; int m, pc, sp;
        // 如果线程池已经停止,处于terminate状态,不能阻塞,也不需要阻塞
        if (w == null || w.qlock < 0 ||           // caller terminating
            (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
            (pc = config & SMASK) == 0)           // parallelism disabled
            canBlock = false;
        // 如果ctl的低32位中有挂起的空闲线程,那么尝试唤醒它,成功则阻塞自己
        // 唤醒后在一定程度上也许会执行到自己被偷的任务fork出的子任务
        // tryRelease第二个参数为0,当唤醒成功后,代表当前线程将被阻塞,
        // 新的空闲线程被唤醒,所以没必要先减少活跃线程数,然后再加上
        else if ((sp = (int)(c = ctl)) != 0)      // release idle worker
            canBlock = tryRelease(c, ws[sp & m], 0L);
        // 如果没有空闲线程,就要创建新的线程
        // 这里会导致线程池中的线程数,在一段时间内会超过创建时指定的并行数
        else {
            // 获取池中的活跃线程数
            int ac = (int)(c >> AC_SHIFT) + pc;
            // 获取池中的总线程数
            int tc = (short)(c >> TC_SHIFT) + pc;
            int nbusy = 0;       // validate saturation
            for (int i = 0; i <= m; ++i) {  // two passes of odd indices
                WorkQueue v;
                // 找奇数位置的队列,循环m次就是执行了两遍。
                // 为什么执行两遍呢?主要是为了判断稳定性,有可能第二遍
                //  的时候,正在执行任务的活跃线程会变少
                if ((v = ws[((i << 1) | 1) & m]) != null) {
                    // 检查工作线程是否正在处理任务,
                    // 如果不在处理任务表示空闲,可以获取其他任务执行
                    if ((v.scanState & SCANNING) != 0)
                        break;
                    ++nbusy;
                }
            }
            // 如果线程池状态不稳定,那么则不能挂起当前线程
            // 如果nbusy!=tc*2 说明还存在空闲或者还在扫描任务的工作线程
            // 如果ctl!=c 代表ctl发生了改变,有可能线程执行完任务后,
            // 没有扫描到新的任务被失活,这种情况下先不挂起,先自旋一段时间
            if (nbusy != (tc << 1) || ctl != c)
                canBlock = false;         // unstable or stale
            
            // tc:池内总线程数  pc:并行数 ac:池内活跃线程数
            // tc>=pc 代表此时线程数已经够多了,当然并不代表不会创建新线程
            // ac>1 代表除了自己外还有其他活跃线程
            // w.isEmpty() 当前工作线程队列为空,其中没有任务需要执行
            // 如果满足如上三个条件,那么则可以直接阻塞,不需要补偿
            else if (tc >= pc && ac > 1 && w.isEmpty()) {
                long nc = ((AC_MASK & (c - AC_UNIT)) |
                           (~AC_MASK & c));      // uncompensated
                //cas ctl
                canBlock = U.compareAndSwapLong(this, CTL, c, nc);
            }
            // 这是对于commonPool 公共线程池的特殊处理
            // 如果总线程数超出MAX_CAP则会抛出异常
            else if (tc >= MAX_CAP ||
                     (this == common && tc >= pc + commonMaxSpares))
                throw new RejectedExecutionException(
                    "Thread limit exceeded replacing blocked worker");
            else {                                // similar to tryAddWorker
                boolean add = false; int rs;      // CAS within lock
                // 准备创建新的工作线程(这里只加总线程数,不加活跃线程数)
                //      因为当前工作线程将在创建补偿线程成功之后阻塞
                // 但是这里会导致总线程数超出并行数
                long nc = ((AC_MASK & c) |
                           (TC_MASK & (c + TC_UNIT)));
                // 线程池没有停止的情况下才允许创建新的工作线程
                if (((rs = lockRunState()) & STOP) == 0)
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK);
                // 创建新的工作线程
                canBlock = add && createWorker(); // throws on exception
            }
        }
        return canBlock;
    }
    

    该方法内的逻辑也算比较简单:

    • 判断池内有没有挂起的空闲线程,如果有则唤醒它代替自己
    • 如果没有挂起的空闲线程,判断池内活跃线程数是否存在两个及以上、总线程数是否饱和、自己工作队列是否为空,如果这些都满足,那么则不需要补偿,直接挂起
    • 如果不满足上述三条件,判断线程数是否关闭,如果没有则创建新线程补偿
      值得一提的是:tryCompensate方法会导致一段时间内,池中总线程数超出创建线程池时指定的并行数。而且如果在用Fork/Join框架时,如果在ForkJoinTask中调用提交任务的方法:sumbit()/invoke()/execute()时,会导致线程池一直补偿线程,硬件允许的情况下会导致一直补偿创建出最大0x7fff = 32767条线程。

    externalAwaitDone方法

    前面分析doJoin逻辑提到过:如果是外部线程调用join方法时,会调用externalAwaitDone方法,接着再来看看这个方法:

    // ForkJoinPool类 → externalAwaitDone()方法
    private int externalAwaitDone() {
        // 如果任务是CountedCompleter类型,尝试使用common池去外部帮助执行,
        // 执行完成后并将完成任务状态返回
        int s = ((this instanceof CountedCompleter) ? // try helping
                 ForkJoinPool.common.externalHelpComplete(
                     (CountedCompleter<?>)this, 0) :
                     // 当前task不是CountedCompleter,尝试从栈顶获取到当前
                     // join的任务交给common池执行,如果不在栈顶,s变为0
                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
        // 如果s>=0,那代表任务是未结束的状态,需要阻塞
        if (s >= 0 && (s = status) >= 0) {
            boolean interrupted = false;
            do {
                // 先设置SIGNAL信号标记,通知其他线程当前需要被唤醒
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                    // 通过synchronized.wait()挂起线程
                    synchronized (this) {
                        if (status >= 0) { // 双重检测
                            try {
                                wait(0L);   // 挂起线程
                            } catch (InterruptedException ie) {
                                interrupted = true;
                            }
                        }
                        else
                            // 如果发现已完成,则唤醒所有等待线程
                            notifyAll();
                    }
                }
            // task未完成会一直循环
            } while ((s = status) >= 0);
            // 响应中断操作
            if (interrupted)
                Thread.currentThread().interrupt();
        }
        // 执行完成后返回执行状态
        return s;
    }
    

    externalAwaitDone方法最简单,如果任务在栈顶,那么直接弹出执行,如果不在则挂起当前线程,直至任务执行结束,其他线程唤醒

    任务拆分合并原理总结

    任务的fork操作比较简单,只需要将拆分好的任务push进入自己的工作队列即可。而对于任务结果的合并:join操作,实现就略显复杂了,大体思想是首先在自己队列中找需要join的任务,如果找到了则执行它并合并结果。如果没找到就是被偷了,需要去找窃取者线程,并且在join任务执行结束之前,会根据窃取链一直帮助窃取者执行任务,如果窃取链断了但是join任务还未执行完,那么挂起当前工作线程,不过在挂起之前会根据情况来决定是否为线程池补偿一条活跃线程代替自己工作,防止整个线程池所有的线程都阻塞,产生线程池“假死”状态。当然,如果是外部线程执行的join操作,如果要被join的任务还未执行完的情况下,那么则需要把这任务交给commonPool公共池来处理

    ForkJoin中任务取消实现原理

    任务取消的cancel方法是实现于Future接口的,逻辑比较简单,源码如下:

    // ForkJoinTask类 → cancel()方法
    public boolean cancel(boolean mayInterruptIfRunning) {
        // 尝试将任务状态修改为CANCELLED,成功返回true,失败返回false
        return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
    }
    
    // ForkJoinTask类 → setCompletion()方法
    private int setCompletion(int completion) {
        // 开启自旋(死循环)
        for (int s;;) {
            // 如果任务已经完成,则直接返回执行后的状态
            if ((s = status) < 0)
                return s;
            // 如果还未完成则尝试通过cas机制修改状态为入参:completion状态
            if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
                if ((s >>> 16) != 0)
                    synchronized (this) { notifyAll(); }
                return completion;
            }
        }
    }
    

    取消任务的逻辑比较简单,任务取消只能发生在任务还未被执行的情况下,如果任务已经完成则会直接返回执行状态。如果任务还未执行,则会尝试使用自旋+CAS机制修改任务状态为CANCELLED状态,成功则代表任务取消成功。

    POOL 终止

    private boolean tryTerminate(boolean now, boolean enable) {
            int rs;
            if (this == common) // 公共线程池,不能shutdown
                return false;
            if ((rs = runState) >= 0) {
                if (!enable)
                    return false;
                rs = lockRunState(); // 进入SHUTDOWN阶段
                unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
            }
    
            if ((rs & STOP) == 0) { // 准备进入STOP阶段
                if (!now) { // 必要的检查
                    for (long oldSum = 0L;;) {
                        WorkQueue[] ws;
                        WorkQueue w;
                        int m, b;
                        long c;
                        long checkSum = ctl;
                        if ((int) (checkSum >> AC_SHIFT) + (config & SMASK) > 0)
                            return false; // 如果有活动的工作线程,还不能停止
                        if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
                            break;
                        for (int i = 0; i <= m; ++i) {
                            if ((w = ws[i]) != null) {
                                if ((b = w.base) != w.top || w.scanState >= 0 || w.currentSteal != null) {
                                    tryRelease(c = ctl, ws[m & (int) c], AC_UNIT);
                                    return false; // 有任务在执行,还不能停止
                                }
                                checkSum += b; // 累加base值
                                if ((i & 1) == 0)
                                    w.qlock = -1; // 偶数索引工作队列,qlock = -1, 拦截从外部提交的任务
                            }
                        }
                        if (oldSum == (oldSum = checkSum)) // 稳定了,退出
                            break;
                    }
                }
                if ((runState & STOP) == 0) { // 如果now等于true,立刻进入STOP结点
                    rs = lockRunState();
                    unlockRunState(rs, (rs & ~RSLOCK) | STOP);
                }
            }
    
            int pass = 0;
            for (long oldSum = 0L;;) {
                WorkQueue[] ws;
                WorkQueue w;
                ForkJoinWorkerThread wt;
                int m;
                long checkSum = ctl;
                if ((short) (checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 || (ws = workQueues) == null
                        || (m = ws.length - 1) <= 0) { // 可以终止了
                    if ((runState & TERMINATED) == 0) {
                        rs = lockRunState();
                        unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED); // 进入TERMINATED阶段
                        synchronized (this) {
                            notifyAll(); // 唤醒所有线程
                        }
                    }
                    break; // 跳出
                }
                for (int i = 0; i <= m; ++i) {
                    if ((w = ws[i]) != null) {
                        checkSum += w.base;
                        w.qlock = -1; // 设置不可用
                        if (pass > 0) {
                            w.cancelAll(); // 取消所有的任务
                            if (pass > 1 && (wt = w.owner) != null) {
                                if (!wt.isInterrupted()) {
                                    try {
                                        wt.interrupt(); // 中断线程
                                    } catch (Throwable ignore) {
                                    }
                                }
                                if (w.scanState < 0)
                                    U.unpark(wt); // 唤醒线程
                            }
                        }
                    }
                }
                if (checkSum != oldSum) { // 不稳定,重来
                    oldSum = checkSum;
                    pass = 0;
                } else if (pass > 3 && pass > m) // 退出
                    break;
                else if (++pass > 1) { // 出队
                    long c;
                    int j = 0, sp;
                    while (j++ <= m && (sp = (int) (c = ctl)) != 0)
                        tryRelease(c, ws[sp & m], AC_UNIT);
                }
            }
            return true;
        }
    

    SHUTDOWN(!common) -> STOP(无任务执行) -> TERMINATED(over)
    线程池关闭的实现逻辑也比较简单,首先会将线程池标记为SHUTDOWN状态,然后根据情况进行下一步处理,如果线程池中没啥活跃线程了,同时任务也不多了,将状态改为STOP状态,在STOP状态中会处理四件事:

    • 将所有活跃的队列状态改为注销状态,w.qlock=-1
    • 取消整个线程池中所有还未执行的任务
    • 唤醒所有因为失活挂起阻塞的线程
    • 尝试中断所有执行的活跃线程,唤醒scanState<0的线程,确保一些还没来得及挂起的线程也能被中断
      最后当所有线程都被中断了,并且未执行的任务都被取消了,那么会把状态改为TERMINATED状态,线程池关闭完成。

    fork-join

    1.有一个大的任务Task(8), 最终被分解成8个小任务Task(1)

    image

    2.将Task(8)加入到任务队列里面(偶数索引,图中未显示),线程A偷取到Task(8),fork了2个Task(4),push到任务队列里面

    image

    3.pop出Task(4), fork出2个Task(2),push到任务队列里面

    image

    4. pop出Task(2), fork出2个Task(1), push到任务队列里面

    image

    5.pop出任务Task(1),此刻已经达到最小粒度,开始执行该任务;与此同时,线程B从底部(base)位置steal走了Task(4)

    image

    6.线程B拿到Task(4)之后,fork出了2个Task(2),push到任务队列里面

    image

    7.线程A执行完自己的任务后,由于Task(4).join(),索性定位到偷走自己任务的线程B所在的工作队列,帮助其执行任务,整体加快任务进度,帮助的方式也是steal

    image

    以上是最简单的一种fork.join方式。

    最后的总结:

    • 创建池ForkJoinPool,初始化并行数=cpu逻辑核心数,池中没有队列,没有线程

    • 外部向线程池提交一个任务:pool.submit(task)

    • 初始化队列数组,容量:2 * Max { 并行数, 2 ^ n }

    • 创建一个共享队列,容量为2^13,随机放在队列数组的某一个偶数索引位

    • 外部提交的任务存入这个共享队列,位置值为2^12处

    • 再创建一条线程,并为其分配一个队列,容量为2^13,随机放在数组中某个奇数索引位

    • 线程启动执行

    • 随机一个位置,线程从此位置开始遍历所有队列,最终扫描到前面提交的任务,并将其从所在的队列取出

    • 线程执行处理任务,首先拆分出两个子任务

      • 如果用invokeAll提交,一个子任务执行,另一个压入队列
      • 如果用fork提交,则两个都压入工作队列
    • 提交的子任务触发创建新的线程并分配新的工作队列,同样放在奇数位置

    • 提交的子任务可能仍然被当前线程执行,但也有可能被其它线程窃取

    • 线程在子任务处join合并,join期间会帮助窃取者处理任务,窃取它的任务执行

      • 优先偷窃取者队列栈底的任务
      • 如果窃取者队列为空,则会根据窃取链去找窃取者的窃取者偷任务.....
      • 如果整个池内都没任务了,则进入阻塞,阻塞前会根据情况补偿活跃线程
    • 提交的子任务不管被哪条线程执行,仍可能会重复上述拆分/提交/窃取/阻塞步骤

    • 当任务被拆分的足够细,达到了拆分阈值时,才会真正的开始执行这些子任务

    • 处理完成会和拆分任务时一样,递归一层一层返回结果

    • 直至最终所有子任务全部都执行结束,从而合并所有子结果,得到最终结果

    • 如果外部没有再提交任务,所有线程扫描不到会被灭活,会进入失活(inactive)状态

    • 一直没有任务时,线程池会削减线程数,直至最终所有线程销毁,所有奇数索引位的队列被注销,ForkJoinPool中只剩下一个最初创建的在偶数索引位的队列,以便于再次接受外部提交的任务,然后再从头开始重复所有步骤....

    相关文章

      网友评论

        本文标题:ForkjoinPool -3

        本文链接:https://www.haomeiwen.com/subject/yhemdrtx.html