美文网首页
Java并发编程——ForkJoinPool之外部提交及work

Java并发编程——ForkJoinPool之外部提交及work

作者: 小波同学 | 来源:发表于2022-09-18 19:29 被阅读0次

一、ForkJoinPool

ForkJoinPool 是 JDK7 引入的,由 Doug Lea 编写的高性能线程池。核心思想是将大的任务拆分成多个小任务(即fork),然后在将多个小任务处理汇总到一个结果上(即join),非常像MapReduce处理原理。同时,它提供基本的线程池功能,支持设置最大并发线程数,支持任务排队,支持线程池停止,支持线程池使用情况监控,也是AbstractExecutorService的子类,主要引入了“工作窃取”机制,在多CPU计算机上处理性能更佳。其广泛用在java8的stream中。

Fork/Join Pool采用优良的设计、代码实现和硬件原子操作机制等多种思路保证其执行性能。其中包括(但不限于):计算资源共享、高性能队列、避免伪共享、工作窃取机制等。

二、工作队列WorkQueue

先对ForkJoinPool里的队列有一个感性认识,这个队列和java线程池的队列有很大区别,java线程池的任务队列是一个阻塞队列。而这里的队列是一个队列的数组,也就是多个队列,即ForkJoinPool每个线程都有自己的队列。另外偶数下标的队列用来保存外部提交的任务,奇数下标装的是线程自己的任务。

三、工作窃取的实现原理

ForkJoinPool类中的WorkQueue正是实现工作窃取的队列,javadoc中的注释如下:

大意是大多数操作都发生在工作窃取队列中(在嵌套类工作队列中)。这些是特殊形式的Deques,主要有push,pop,poll操作。

Deque是双端队列(double ended queue缩写),头部和尾部任何一端都可以进行插入,删除,获取的操作,即支持FIFO(队列)也支持LIFO(栈)顺序。

Deque接口的实现最常见的是LinkedList,除此还有ArrayDeque、ConcurrentLinkedDeque等。

工作窃取模式主要分以下几个步骤:

  • 1、每个线程都有自己的双端队列。
  • 2、当调用fork方法时,将任务放进队列头部,线程以LIFO顺序,使用push/pop方式处理队列中的任务。
  • 3、如果自己队列里的任务处理完后,会从其他线程维护的队列尾部使用poll的方式窃取任务,以达到充分利用CPU资源的目的。
  • 4、从尾部窃取可以减少同原线程的竞争。
  • 5、当队列中剩最后一个任务时,通过cas解决原线程和窃取线程的竞争。

流程大致如下所示:


工作窃取便是ForkJoinPool线程池的优势所在,在一般的线程池比如ThreadPoolExecutor中,如果一个线程正在执行的任务由于某种原因无法继续运行,那么该线程会处于等待状态,包括singleThreadPool、fixedThreadPool、cachedThreadPool这几种线程池。

而在ForkJoinPool中,那么线程会主动寻找其他尚未被执行的任务然后窃取过来执行,减少线程等待时间。

JDK8中的并行流(parallelStream)功能是基于ForkJoinPool实现的,另外还有java.util.concurrent.CompletableFuture异步回调future,内部使用的线程池也是ForkJoinPool。

四、ForkJoinPool总体介绍

在java中运行ForkJoinPool,经过对源码的分析,实际上,需要4个类来配合运行。这四个类分别是:

  • ForkJoinPool 这是线程池的核心类,也是提供方法供我们使用的入口类。基本上forkJoin的核心操作及线程池的管理方法都由这个类提供。

  • ForkJoinPool.WorkQueue 这是ForkJoinPool类的内部类。也是线程池核心的组成部分。ForkJoinPool线程池将由WorkQueue数组组成。为了进一步提高性能,与ThreadPoolExecutor不一样的是,这没有采用外部传入的任务队列,而是作者自己实现了一个阻塞队列。

  • ForkJoinWorkerThread 线程池中运行的thread也是作者重新定义的。这个类的目的是在于将外部的各种形式的task都转换为统一的ForkJoinTask格式。

  • ForkJoinTask 这是ForkJoinPool支持运行的task抽象类,我们一般使用其子类如RecursiveTask或者RecursiveAction。这个在前文有介绍。这个类提供了任务拆分和结果合并的方法。

五、ForkJoinPool源码分析

5.1 FokJoinPool的字段

5.1.1 常量

// Bounds 控制线程池相关边界的常量
// 二进制形式0b00000000_00000000_11111111_11111111  线程池索引掩码
static final int SMASK = 0xffff;        // short bits == max index

// 二进制形式0b00000000_000000000_1111111_11111111   工作者线程数的最大值
static final int MAX_CAP = 0x7fff;        // max #workers - 1

//二进制形式0b00000000_00000000_11111111_11111110
//用来取workQueues偶数槽下标,(二进制最低位强置为0,当最低位为0时,表示偶数)
static final int EVENMASK = 0xfffe;        // even short bits

//二进制形式0b00000000_00000000_00000000_01111110  最大的槽位值(限制最多只有64们偶数槽位)
static final int SQMASK = 0x007e;        // max 64 (even) slots


// Masks and units for WorkQueue.scanState and ctl sp subfield  
//控制WorkQueue的scanState属性 、及ForkJoinPool的ctl属性的低32位sp子属性
//二进制形式0b00000000_00000000_00000000_00000001  ,runState的最低位,1表示正在扫描,0表示正在执行任务
static final int SCANNING = 1;             // false when running tasks

//  二进制形式0b10000000_00000000_00000000_00000000    runState最高位为1,表示此线程是INACTIVE空闲的
static final int INACTIVE = 1 << 31;       // must be negative

// 0b00000000_00000001_00000000_00000000  版本号,让线程池索引加1,取下一个线程
static final int SS_SEQ = 1 << 16;       // version count

// Mode bits for ForkJoinPool.config and WorkQueue.config 控制ForkJoinPool.config和WorkQueue.config属性的常量

// 二进制形式0b11111111_11111111_00000000_00000000  获取当前队列mode的掩码,只取config的高16位
static final int MODE_MASK = 0xffff << 16;  // top half of int

// 二进制形式0b00000000_00000000_00000000_00000000 先进先出模式(异步模式),将WorkQueue看作(内部任务)队列
static final int LIFO_QUEUE = 0;

// 二进制形式0b00000000_00000001_00000000_00000000 后时先出模式(非异步模式),将WorkQueue看作(内部任务)栈
static final int FIFO_QUEUE = 1 << 16;

//  二进制形式0b10000000_00000000_00000000_00000000  共离模式,提交外部任务放入的队列
static final int SHARED_QUEUE = 1 << 31;       // must be negative

 
// Lower and upper word masks  分别取ctl的高32位和低32位的掩码
private static final long SP_MASK    = 0xffffffffL;
private static final long UC_MASK    = ~SP_MASK;

// Active counts  获取活动线程数的相关掩码
private static final int  AC_SHIFT   = 48;
private static final long AC_UNIT    = 0x0001L << AC_SHIFT;
private static final long AC_MASK    = 0xffffL << AC_SHIFT;

// Total counts 获取部线程数的相关掩码
private static final int  TC_SHIFT   = 32;
private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
private static final long TC_MASK    = 0xffffL << TC_SHIFT;
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign

// runState bits: SHUTDOWN must be negative, others arbitrary powers of two
//执行器运行状态的掩码。
private static final int  RSLOCK     = 1;
private static final int  RSIGNAL    = 1 << 1;
private static final int  STARTED    = 1 << 2;
private static final int  STOP       = 1 << 29;
private static final int  TERMINATED = 1 << 30;
private static final int  SHUTDOWN   = 1 << 31;

上面这些常量,大多都是获取相关属性的各分割位的掩码。SMASK 、SQMASK是获取线程池中线程数相关边界的掩码,SCANNING 、INACTIVE等又是设置获取runState及 ctl的sp子属性相关分割位含义的掩码。

5.1.2 成员变量

// 用来配置ctl在控制线程数量使用
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign

//控制线程池数量(ctl & ADD_WORKER) != 0L 时创建线程,
// 也就是当ctl的第16位不为0时,可以继续创建线程
volatile long ctl;                   // main pool control

//全局锁控制,全局运行状态
volatile int runState;               // lockable status

//config二进制形式的低16位表示parallelism,
//config二进制形式的第高16位表示mode,1表示异步模式, 使用先进先出队列, 0表示同步模式, 使用先进后出栈
//低16位表示workerQueue在pool中的索引,高16位表示mode, 有FIFI LIFL 
final int config;  // parallelism, mode   
 
//生成workerQueue索引的重要依据
int indexSeed;         // to generate worker index  

//工作者队列数组,内部线程ForkJoinWorkerThread启动时会注册一个WorkerQueue对象到这个数组中
volatile WorkQueue[] workQueues;     // main registry 

//工作者线程线程工厂,创建ForkJoinWorkerThread的策略
final ForkJoinWorkerThreadFactory factory;  

//在线程因未捕异常而退出时,java虚拟机将回调的异常处理策略
final UncaughtExceptionHandler ueh;  // per-worker UEH 

//工作者线程名的前缀
final String workerNamePrefix;       // to create worker name string  

//执行器所有线程窃取的任务总数,也作为监视runState的锁
volatile AtomicLong stealCounter;    // also used as sync monitor

//通用的执行器,它在静态块中初始化
static final ForkJoinPool common; 

5.1.2 ctl

类似于ThreadPoolExecutor,在ForkJoinPool中也有一个ctl变量负责表达ForkJoinPool的整个生命周期和相关的各种状态。不过ctl变量更加复杂,是一个long型变量。

ctl 组成
/*
 * Bits and masks for field ctl, packed with 4 16 bit subfields:
 * AC: Number of active running workers minus target parallelism
 * TC: Number of total workers minus target parallelism
 * SS: version count and status of top waiting thread
 * ID: poolIndex of top of Treiber stack of waiters
 */

ctl变量的64个比特位被分成五部分:

  • AC:最高的16个比特位,表示Active线程数-parallelism,parallelism是上面的构造方法传进去的参数;

  • TC:次高的16个比特位,表示Total线程数-parallelism;

  • ST:1个比特位,如果是1,表示整个ForkJoinPool正在关闭;

  • EC:15个比特位,表示阻塞栈的栈顶线程的wait count;

  • ID:16个比特位,表示阻塞栈的栈顶线程对应的poolIndex。

  • 这个线程池支持的最大线程数为32767,如果超过会抛出异常。这里只支持32767的并行度是因为ctl的组成关系。只有16位用来存放线程数,最高位表示正负,所以只有15位来表示,也就是2的15次方减一。

阻塞栈–Treiber Stack
  • 1、实现多个线程的阻塞、唤醒,除了park/unpark这一对操作原语,还需要一个无锁链表实现的阻塞队列,把所有阻塞的线程串在一起。
  • 2、在ForkJoinPool中,使用了阻塞栈。把所有空闲的Worker线程放在一个栈里面。
  • 3、ForkJoinWorkerThread的poolIndex变量,记录了自己在ForkJoinWorkerThread[]数组中的下标位置,poolIndex变量就相当于每个ForkJoinPoolWorkerThread对象的地址;
  • 4、ForkJoinWorkerThread的nextWait变量,记录了前一个阻塞线程的poolIndex,nextWait变量就相当于链表的next指针,把所有的阻塞线程串联在一起,组成一个TreiberStack。
  • 5、ctl变量的最低16位,记录了栈的栈顶线程的poolIndex;中间的15位,记录了栈顶线程被阻塞的次数,也称为waitcount。

下图为所有阻塞的Worker线程组成的Treiber Stack。


  • 首先,WorkQueue有一个id变量,记录了自己在WorkQueue[]数组中的下标位置,id变量就相当于每个WorkQueue或ForkJoinWorkerThread对象的地址;

  • 其次,ForkJoinWorkerThread还有一个stackPred变量,记录了前一个阻塞线程的id,这个stackPred变量就相当于链表的next指针,把所有的阻塞线程串联在一起,组成一个Treiber Stack。

  • 最后,ctl变量的最低16位,记录了栈的栈顶线程的id;中间的15位,记录了栈顶线程被阻塞的次数,也称为wait count。

ctl变量的初始值
long np = (long)(-parallelism)
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
  • 在初始时ForkJoinPool中的线程个数为0,AC=0-parallelism,TC=0-parallelism。即只有高32位的AC、TC 两个部分填充了值,低32位都是0填充。
ForkJoinWorkerThread状态与个数

在ThreadPoolExecutor中,有corePoolSize和maxmiumPoolSize 两个参数联合控制总的线程数,而在ForkJoinPool中只传入了一个parallelism参数,且这个参数并不是实际的线程数。那么,ForkJoinPool在实际的运行过程中,线程数究竟是由哪些因素决定的呢?

要回答这个问题,先得明白ForkJoinPool中的线程都可能有哪几种状态?

ForkJoinPool中的线程可能的三种状态

  • 1、空闲状态(放在Treiber Stack里面)。
  • 2、活跃状态(正在执行某个ForkJoinTask,未阻塞)。
  • 3、阻塞状态(正在执行某个ForkJoinTask,但阻塞了,于是调用join,等待另外一个任务的结果返回)。

ctl变量很好地反映出了三种状态:

  • 高32位:u=(int) (ctl >>> 32),然后u又拆分成tc、ac 两个16位;
  • 低32位:c=(int) ctl。
    • c>0,说明Treiber Stack不为空,有空闲线程;c=0,说明没有空闲线程;
    • ac>0,说明有活跃线程;ac<=0,说明没有空闲线程,并且还未超出parallelism;
    • tc>0,说明总线程数 >parallelism。

在提交任务的时候:


在通知工作线程的时候,需要判断ctl的状态,如果没有闲置的线程,则开启新线程:


ForkerJoinPool 没有使用 BlockingQueue,也就不利用其阻塞/唤醒机制,而是利用了park/unpark原语,并自行实现了Treiber Stack。

5.2 构造函数

ForkJoinPool提供了3个构造函数:

public ForkJoinPool(int parallelism,
                    ForkJoinWorkerThreadFactory factory,
                    UncaughtExceptionHandler handler,
                    boolean asyncMode) {
    this(checkParallelism(parallelism),
         checkFactory(factory),
         handler,
         asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
         "ForkJoinPool-" + nextPoolId() + "-worker-");
    checkPermission();
}

此方法没有进行初始化,但是有些字段需要进行说明:

属性 类型 说明
parallelism int 表示ForkJoinPool支持的最大并行度
factory ForkJoinWorkerThreadFactory 用于产生线程的工厂方法
handler UncaughtExceptionHandler 用于异常处理的拒绝策略
asyncMode boolean 异步模式,如果为ture,则队列将采用FIFO_QUEUE,实现先进先出,反之则LIFO_QUEUE 实现后进先出

这个方法中会对并行度和factory进程check,以确保系统能支持。
parallelism最大不允许大于MAX_CAP。最小必须大于0。
factory则只是判断是否为空,如果为空则这个地方会出现NPE异常。

private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

这个方法才是真正的初始化的构造方法。上述计算过程,假定parallelism 为7


然后再计算np和ctl


这样就完成了初始化,需要注意的是,这个位运算操作是ForkJoinPool的核心,需要扎实理解。

public ForkJoinPool(int parallelism) {
    this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}

public ForkJoinPool() {
    this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
         defaultForkJoinWorkerThreadFactory, null, false);
}   

无参的构造函数,将通过MAX_CAP和当前系统允许的最大并行度的最小值来指定并行度。使用默认的ThreadFactory。handler位空,默认采用asyncMode为false.
实际实这个方法最终还是调用的上面的两个方法,完成了初始化工作。

5.3 ForkJoinPool的基本组成

再了解了前面的变量之后,我们可以发现,ForkJoinPool的实际组成是,由一个WorkQueue的数组构成。但是这个数组比较特殊,在偶数位,存放外部调用提交的任务,如通过execute和submit等方法。这种队列称为SubmissionQueue。而另外一种任务是前者在执行过程种通过fork方法产生的新任务。这种队列称为workQueue。

SubmissionQueue与WorkQueue的区别在于,WorkQueue的属性“final ForkJoinWorkerThread owner;”是有值的。也就是说,WorkQueue将有ForkJoinWorkerThread线程与之绑定。在运行过程中不断的从WorkQueue中获取任务。如果没有可执行的任务,则将从其他的SubmissionQueue和WorkQueue中窃取任务来执行。

前面学习过了工作窃取算法,实际上载WorkQueue上的ForkJoinWorkerThread就是一个窃取者,它从SubmissionQueue中或者去偷的WorkQueue中,按FIFO的方式窃取任务。之后执行。也从自己的WorkQueue中安LIFO或者FIFO的方式执行任务。这取决于模式的设定。默认情况下是采用LIFO的方式在执行。组成如下图所示:

所以工作队列数组的一个分布情况,它的大小一定是2次幂,奇数位和偶数位存放的虽然都是任务队列。但是奇数位是带工作线程的存放fork出的子任务的队列,偶数队列存放的是外部提交的任务。

外部任务提交的方法调用过程:


5.4 ForkJoinPool外部任务的提交

ForkJoinPool在初始化之后,支持三种调用方式 invoke 、execute 和submit。我们来对这三种方式进行分析。

5.4.1 invoke

public <T> T invoke(ForkJoinTask<T> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task.join();
}

可以看到invoke是支持有返回值的调用方法,之后调用externalPush进行push。

5.4.2 execute

public void execute(ForkJoinTask<?> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
}

而execute方法,则是没有返回值的调用方法。适用于哪些不需要返回结果的计算。此外还有种变体,支持Runnable。实际上是将Runnable转换位ForkJoinTask。

public void execute(Runnable task) {
    if (task == null)
        throw new NullPointerException();
    ForkJoinTask<?> job;
    if (task instanceof ForkJoinTask<?>) // avoid re-wrap
        job = (ForkJoinTask<?>) task;
    else
        job = new ForkJoinTask.RunnableExecuteAction(task);
    externalPush(job);
}

5.4.3 submit

submit于invoke方法基本一致,唯一的不同是,方法内部没有join方法,需要自行定义。

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    if (task == null)
        throw new NullPointerException();
    externalPush(task);
    return task;
}

这样可能更加灵活。
此外也支持Runnable和Callable,而且Runnable也可以支持返回值。对于Callable,通过AdaptedCallable进行适配。

public <T> ForkJoinTask<T> submit(Callable<T> task) {
    ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
    externalPush(job);
    return job;
}

对于Runnable,如果需要返回值,统一还是返回的ForkJoinTask,之后在外层join得到结果。

public <T> ForkJoinTask<T> submit(Runnable task, T result) {
    ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
    externalPush(job);
    return job;
}

也可以采用这种方式:

public ForkJoinTask<?> submit(Runnable task) {
    if (task == null)
        throw new NullPointerException();
    ForkJoinTask<?> job;
    if (task instanceof ForkJoinTask<?>) // avoid re-wrap
        job = (ForkJoinTask<?>) task;
    else
        job = new ForkJoinTask.AdaptedRunnableAction(task);
    externalPush(job);
    return job;
}

实际上通过这些提交方法,我们发现,最关键的入口时externalPush方法。下面我们就对这个方法进行分析。

5.5 外部提交过程分析

现在开始跟踪externalPush的源码,对外部提交过程进行分析。

5.5.1 externalPush

这是外部提交的唯一入口。

/**
 * 尝试将任务添加到提交者当前的队列中,此方法只处理大多数情况,实际上是根据随机数指定一个workQueues的槽位,如果这个位置存在WorkQueue,则加入队列,然后调用signalWork通知其他工作线程来窃取。反之,则通过externalSubmit处理。这只适用于提交队列存在的普通情况。更复杂的逻辑参考externalSubmit。
 *
 * @param task the task. Caller must ensure non-null.
 */
final void externalPush(ForkJoinTask<?> task) {
    WorkQueue[] ws; WorkQueue q; int m;
    //通过ThreadLocalRandom产生随机数
    int r = ThreadLocalRandom.getProbe();
    //线程池的状态
    int rs = runState;
    //如果ws已经完成初始化,且根据随机数定位的index存在workQueue,且cas的方式加锁成功
    if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
        //此处先用随机数和wq的size取&,之后再取SQMASK,这些操作将多余的位的值去除
        (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
        //cas的方式加锁 将q中位于QLOCK位置的内存的值如果为0,则改为1,采用cas的方式进行
        U.compareAndSwapInt(q, QLOCK, 0, 1)) {
        ForkJoinTask<?>[] a; int am, n, s;
        //判断q中的task数组是否为空,
        if ((a = q.array) != null &&
            //am为q的长度 这是个固定值,如果这个值大于n n就是目前队列中的元素,实际实这里是判断队列是否有空余的位置
            (am = a.length - 1) > (n = (s = q.top) - q.base)) {
            //j实际上是计算添加到workQueue中的index
            int j = ((am & s) << ASHIFT) + ABASE;
            //将task通过cas的方式插入a的index为j的位置
            U.putOrderedObject(a, j, task);
            //将队列q的QTOP位置的内存加1,实际上就是将TOP增加1
            U.putOrderedInt(q, QTOP, s + 1);
            //以可见的方式将q的QLOCK改为0
            U.putIntVolatile(q, QLOCK, 0);
            //此处,如果队列中的任务小于等于1则通知其他worker来窃取。为什么当任务大于1的时候不通知。而且当没有任务的时候发通知岂不是没有意义?此处不太理解
            if (n <= 1)
                //这是个重点方法,通知其他worker来窃取
                signalWork(ws, q);
            return;
        }
        U.compareAndSwapInt(q, QLOCK, 1, 0);
    }
    //初始化
    externalSubmit(task);
}

实际上externalPush的逻辑只能处理简单的逻辑,对于其他复杂的逻辑,则需要通过externalSubmit提供,而这些简单的逻辑,实际上就是添加到任务队列。这个任务队列的索引一定是偶数:i = m & r & SQMASK。

这个计算过程,实际上由于SQMASK最后一位为0,因此计算的index的最后一位一定为0,这样导致这个值为偶数。也就是说,workQueues的偶数位存放的是外部提交的任务队列。之后提交成功之后,调用signalWork方法让其他的worker来窃取。

总结下,就是当从外部提交一个任务,会随机的选择一个偶数下标位,然后将任务追加到这个WorkQueue里的数组的最后。

但是请注意,这个方法是精简版的任务提交,因为它没有处理线程池的初始化等问题,如果随机到的偶数槽位队列可以提交任务,则就会直接将任务推入队列。否则会调用完整版任务提交方法externalSubmit。这种方式很像重入锁的入队有没有啊,Doug Lea一贯做法。

5.5.2 externalSubmit

这是提交过程中的分支逻辑处理的方法。完整版外部的任务提交方法。

  • 第一步,如果线程池没有初始化会先进行初始化操作,比如工作队列数组的空间分配还有线程池的状态修改等。

  • 第二步,如果随机的偶数槽位队列不为空,则将任务推入队列并调用signalWork方法唤醒线程。

  • 如果第二步槽位为null,则第三步为这个槽位创建队列后再重复循环。如果发生竞争会重新随机槽位。

/**
 *externalPush的完整版本,处理哪些不常用的逻辑。如第一次push的时候进行初始化、此外如果索引队列为空或者被占用,那么创建一个新的任务队列。
 *
 * @param task the task. Caller must ensure non-null.
 */
private void externalSubmit(ForkJoinTask<?> task) {
   //r是随机数,此处双重检测,确保r不为0
    int r;                                    // initialize caller's probe
    if ((r = ThreadLocalRandom.getProbe()) == 0) {
        ThreadLocalRandom.localInit();
        r = ThreadLocalRandom.getProbe();
    }
    //死循环
    for (;;) {
        WorkQueue[] ws; WorkQueue q; int rs, m, k;
        //move默认为false
        boolean move = false;
        //如果runstate小于0 则线程池处于SHUTDOWN状态,配合进行终止
        if ((rs = runState) < 0) {
            //终止的方法 并抛出异常,拒绝该任务
            tryTerminate(false, false);     // help terminate
            throw new RejectedExecutionException();
        }
        //如果状态不为STARTED 说明此时线程池可用
        else if ((rs & STARTED) == 0 ||     // initialize
                 //如果workQueues为null 或者其length小于1 则说明没用初始化
                 ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
            int ns = 0;
            //对线程池以CAS的方式加锁,从RUNSTATE变为RSLOCK,如果不为RUNSTATE则自旋
            rs = lockRunState();
            try {
                //如果状态为  RSIGNAL RSLOCK  说明加锁成功
                if ((rs & STARTED) == 0) {
                   //用cas的方式初始化STEALCOUNTER
                    U.compareAndSwapObject(this, STEALCOUNTER, null,
                                           new AtomicLong());
                    // create workQueues array with size a power of two
                    //创建workQueues的数组
                    //根据并行度计算得到config,此处确保p在SMASK范围内,即2个字节
                    int p = config & SMASK; // ensure at least 2 slots
                    //n判断p是否大于1,反之则默认按1处理
                    int n = (p > 1) ? p - 1 : 1;
                    //下列过程是找到大于n的最小的2的幂 这个过程之前在HashMap中演示过
                    n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                    n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                    //根据并行度计算得到了n,之后根据n确定workQueues的array的大小,这个数组的大小不会超过2^16
                    workQueues = new WorkQueue[n];
                    //将ns的值修改为STARTED
                    ns = STARTED;
                }
            } finally {
                //最后将状态解锁 此时改为STARTED状态,这个计算过程有一点绕 
                unlockRunState(rs, (rs & ~RSLOCK) | ns);
            }
            //实际上这个分支只是创建了外层的workQueues数组,此时数组内的内容还是全部都是空的 
        }
        //如果根据随机数计算出来的槽位不为空,即索引处的队列已经创建,这个地方是外层死循环再次进入的结果
        //需要注意的是这个k的计算过程,SQMASK最低的位为0,这样就导致,无论随机数r怎么变化,得到的结果总是偶数。
        else if ((q = ws[k = r & m & SQMASK]) != null) {
            //如果这个槽位的workQueue未被锁定,则用cas的方式加锁 将其改为1
            if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                //拿到这个队列中的array
                ForkJoinTask<?>[] a = q.array;
                //s为top索引
                int s = q.top;
                //初始化submitted状态
                boolean submitted = false; // initial submission or resizing
                try {                      // locked version of push
                    //与上面的externalPush一致,此处push到队列中
                    //先判断 数组不为空且数组中有空余位置,能够容纳这个task
                    if ((a != null && a.length > s + 1 - q.base) ||
                        //或者通过初始化的双端队列的数组不为null 
                        (a = q.growArray()) != null) {
                        //计算数组的index
                        int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                        //在索引index处插入task
                        U.putOrderedObject(a, j, task);
                        //将队列的QTOP加1
                        U.putOrderedInt(q, QTOP, s + 1);
                        //将提交成功状态改为true
                        submitted = true;
                    }
                } finally {
                    //最终采用cas的方式进行解锁 将队列的锁定状态改为0
                    U.compareAndSwapInt(q, QLOCK, 1, 0);
                }
                //如果submitted为true说明数据添加成功,此时调用其他worker来窃取
                if (submitted) {
                    //调用窃取的方法
                    signalWork(ws, q);
                    //退出
                    return;
                }
            }
            //move状态改为true
            move = true;                   // move on failure
        }
        //如果状态不为RSLOCK 上面两个分支都判断过了,那么此处说明这个索引位置没有初始化
        else if (((rs = runState) & RSLOCK) == 0) { // create new queue
           /new一个新队列
            q = new WorkQueue(this, null);
            //hint 记录随机数
            q.hint = r;
            //计算config SHARED_QUEUE 将确保第一位为1 则这个计算出来的config是负数,这与初始化的方法是一致的
            q.config = k | SHARED_QUEUE;
            //将scan状态改为INCATIVE
            q.scanState = INACTIVE;
            //用cas的方式加锁
            rs = lockRunState();  将创建的workQueue push到workQueues的数组中
            // publish index
            if (rs > 0 &&  (ws = workQueues) != null &&
                k < ws.length && ws[k] == null)
                //赋值
                ws[k] = q;                 // else terminated
            //解锁
            unlockRunState(rs, rs & ~RSLOCK);
        }
        else
            //将move改为true
            move = true;                   // move if busy
        if (move)
            //重新计算r
            r = ThreadLocalRandom.advanceProbe(r);
    }
}

三种情况:1、需要初始化。2、该槽位上的workqueue不是null。3、整体数组不为null但是其对应的槽位上的workqueue是null。

把任务放进队列是自旋过程,知道成功为止,所以最后就是把任务放入了其对应的槽位中。

这个地方将产生的无owoner的workQueue放置在索引k的位置,需要注意的是k的计算过程,k= r & m & SQMASK。r是随机数,m是数组的长度,而SQMASK:


最后一位不为1,这就导致不管r如何变化,得到的k最后一位都不为1,这就构造了一个偶数k最后一位为0,k不可能是奇数。

5.5.3 signalWork

任务提交成功后会调用这个方法,它的作用就是激活一个空闲线程或创建一个线程并绑定一个队列在队列数组的奇数槽位。

如果清楚ForkJoinPool中主要成员变量所代表的含义这个方法就可以很容易的理解。它首先去判断是否有空闲的队列也就是通过ctl的低32位,如果没有则会判断是否能在添加线程,可以就会创建。如果有空闲线程则会进行激活。具体实现可以看下面代码:

/**
 * 此处将激活worker Thread。如果工作线程太少则创建,反之则来进行窃取。
 *
 * @param ws the worker array to use to find signallees
 * @param q a WorkQueue --if non-null, don't retry if now empty
 */
final void signalWork(WorkQueue[] ws, WorkQueue q) {
    long c; int sp, i; WorkQueue v; Thread p;
    //如果ctl为负数  ctl初始化的时候就会为负数 如果小于0  说明有任务需要处理
    while ((c = ctl) < 0L) {                       // too few active
        //c为long,强转int 32位的高位都丢弃,此时如果没有修改过ctl那么低位一定为0 可参考前面ctl的推算过程,所以此处sp 为0 sp为0则说明每月空闲的worker
        if ((sp = (int)c) == 0) {                  // no idle workers
            //还是拿c与ADD_WORKER取& 如果不为0 则说明worker太少,需要新增worker
            if ((c & ADD_WORKER) != 0L)            // too few workers
                //通过tryAddWorker 新增worker
                tryAddWorker(c);
            break;
        }
        //再次缺认ws有没有被初始化 如果没有 退出
        if (ws == null)                            // unstarted/terminated
            break;
        //如果ws的length小于sp的最低位 退出
        if (ws.length <= (i = sp & SMASK))         // terminated
            break;
        //如果index处为空 退出
        if ((v = ws[i]) == null)                   // terminating
            break;
        //将sp的低32位取出
        int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
        //计算用sp减去 scanState
        int d = sp - v.scanState;                  // screen CAS
        long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
        //采用cas的方式修改ctl 实际上就是加锁 由于ctl的修改可能会导致while循环退出
        if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
            v.scanState = vs;                      // activate v
            //如果p被park wait中
            if ((p = v.parker) != null)
                //将worker唤醒 
                U.unpark(p);
            //退出
            break;
        }
        //如果此队列为空或者没有task 也退出
        if (q != null && q.base == q.top)          // no more work
            break;
    }
}

可以看到,实际上signalWork就做了两件事情,第一,判断worker是否充足,如果不够,则创建新的worker。第二,判断worker的状态是否被park了,如果park则用unpark唤醒。这样worker就可以取scan其他队列进行窃取了。

5.5.4 tryAddWorker

此方法是在worker不足的时候,添加一个worker来执行的具体类。

/**
 * 尝试新增一个worker,然后增加ctl中记录的worker的数量
 *
 * @param c incoming ctl value, with total count negative and no
 * idle workers.  On CAS failure, c is refreshed and retried if
 * this holds (otherwise, a new worker is not needed).
 */
private void tryAddWorker(long c) {
    //传入的c为外层调用方法的ctl add标记为false
    boolean add = false;
    do {
        long nc = ((AC_MASK & (c + AC_UNIT)) |
                   (TC_MASK & (c + TC_UNIT)));
        //如果此时ctl与外层传入的ctl相等 说明没有被修改
        if (ctl == c) {
            int rs, stop;                 // check if terminating
            //用cas的方式加锁
            if ((stop = (rs = lockRunState()) & STOP) == 0)
                //增加ctl的数量,如果成功 add为ture
                add = U.compareAndSwapLong(this, CTL, c, nc);
            //解锁
            unlockRunState(rs, rs & ~RSLOCK);
            //如果stop不为0 则说明线程池停止 退出
            if (stop != 0)
                break;
            //如果前面增加ctl中的数量成功,那么此处开始创建worker
            if (add) {
                createWorker();
                break;
            }
        }
    //这个while循环, 前半部分与ADD_WORKER取并,最终只会保留第48位,这个位置为1,同时c的低32为为0,
    } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}

实际上这个类中,只是做了一些准备过程,增加count,以及加锁判断,最终还是通过createWorker来进行。

5.5.5 createWorker

创建worker的具体方法。

// Creating, registering and deregistering workers

/**
 * 创建并启动一个worker,因为前面已经做了增加count,如果此处出现异常,创建worker不成功,则在deregisterWorker中会判断如果ex不为空,且当前为创建状态的话,会重新进入tryAddWorker方法。
 *
 * @return true if successful
 */
private boolean createWorker() {
    //创建线程的工厂方法
    ForkJoinWorkerThreadFactory fac = factory;
    Throwable ex = null;
    ForkJoinWorkerThread wt = null;
    try {
        //如果工厂方法不为空,则用这个工厂方法创建线程,之后再启动线程,此时newThread将与workQueue绑定
        if (fac != null && (wt = fac.newThread(this)) != null) {
            wt.start();
            return true;
        }
    //如果创建失败,出现了异常 则ex变量有值
    } catch (Throwable rex) {
        ex = rex;
    }
    deregisterWorker(wt, ex);
    return false;
}

此方法只是创建了一个ForkJoinThread,实际上worker还是没有创建。实际上这个创建过程是再newThread(this)中来进行的。

public class ForkJoinWorkerThread extends Thread {

    final ForkJoinPool pool;                // the pool this thread works in
    final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics

    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        // Use a placeholder until a useful name can be set in registerWorker
        super("aForkJoinWorkerThread");
        this.pool = pool;
        this.workQueue = pool.registerWorker(this);
    }
}

再线程创建成功之后调用registerWorker与之绑定。
如果线程创建失败或者出现异常就要调用deregisterWorker对count进行清理或者解除绑定。

5.5.6 registerWorker

实际上是这个方法,完成worker的创建和绑定。

/**
 * 创建线程,并建立线程与workQueue的关系,此处只会在workQueues数组的奇数位操作
 *
 * @param wt the worker thread
 * @return the worker's queue
 */
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
    UncaughtExceptionHandler handler;
    //将线程设置为守护线程
    wt.setDaemon(true);                           // configure thread
    //如果没有handler则抛出异常
    if ((handler = ueh) != null)
        wt.setUncaughtExceptionHandler(handler);
    //创建一个workQueue,此时owoner为当前输入的ForkJoinThread
    WorkQueue w = new WorkQueue(this, wt);
    //定义i为0
    int i = 0;                                    // assign a pool index
    //定义mode 
    int mode = config & MODE_MASK;
    //加锁
    int rs = lockRunState();
    try {
        WorkQueue[] ws; int n;                    // skip if no array
        //如果workQueues存在,且长度大于0
        if ((ws = workQueues) != null && (n = ws.length) > 0) {
            //通过魔数计算
            int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
            //m为n-1,而n为数组的初始化长度,第一次创建的时候,n=16,那么m为15
            int m = n - 1;
            //将s左移然后最后一位补上1,之后与奇数m求并集,那么得到的结果必然是奇数
            i = ((s << 1) | 1) & m;               // odd-numbered indices
            //判断i位置是否为空 如果为空,出现了碰撞,则计算步长向后移动 这个步长一定是偶数
            if (ws[i] != null) {                  // collision
                int probes = 0;                   // step by approx half n
                //最小步长为2  n是数组长度,比为偶数,那么如果n小于等于4,则步长为2,反之,则将n右移,将偶数最后一位的0清除,之后再和EVENMASK求并,这样就相当于将原来的长度缩小2倍,并确保是偶数。之后再加上2。那么假定n为16的话,此处计算的step就为10
                int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                //之后再通过while循环,继续判断增加步长之后是否碰撞,如果碰撞,则继续增加步长
                while (ws[i = (i + step) & m] != null) {
                    //如果还是碰撞,且probes增加1之后大于长度n,则会触发扩容,workQueues会扩大2倍 这个probes感觉意义不大
                    if (++probes >= n) {
                        workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                        m = n - 1;
                        //将probes置为0
                        probes = 0;
                    }
                }
            }
            //设置随机数seed
            w.hint = s;                           // use as random seed
            //修改config
            w.config = i | mode;
            //修改scanState为i
            w.scanState = i;                      // publication fence
            //将w设置到i处
            ws[i] = w;
        }
    } finally {
       //cas的方式进行解锁
        unlockRunState(rs, rs & ~RSLOCK);
    }
    //此处设置线程name
    wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
    return w;
}

再registerWorker中,对工作线程和workQueue进行绑定,并设置到workQueues的奇数索引处。注意这里计算得到奇数索引的算法。

//将s左移然后最后一位补上1,之后与奇数m求并集,那么得到的结果必然是奇数。
i = ((s << 1) | 1) & m;
这个位操作,是我们非常值得注意的地方。

注册worker的逻辑就是在workqueue数组里找一个空的槽位给当前的worker,如果超过了自旋的上限就扩容

5.5.7 deregisterWorker

如果线程创建没有成功,那么count需要回收。以及进行一些清理工作。

/**
 *此方法的主要目的是在创建worker或者启动worker失败之后的回调方法,此时将之前的ctl中增加的count减去。
 *
 * @param wt the worker thread, or null if construction failed
 * @param ex the exception causing failure, or null if none
 */
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
    WorkQueue w = null;
    //如果workQueue和thread不为空
    if (wt != null && (w = wt.workQueue) != null) {
        WorkQueue[] ws;                           // remove index from array
        //根据config计算index
        int idx = w.config & SMASK;
        //加锁
        int rs = lockRunState();
        如果ws不为空且length大于idx同时idx处的worker就是workerQueue 则将该idx处的worker移除
        if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
            ws[idx] = null;
        //解锁 修改rs状态 
        unlockRunState(rs, rs & ~RSLOCK);
    }
    //后续对count减少
    long c;                                       // decrement counts
    //死循环 cas的方式将ctl修改
    do {} while (!U.compareAndSwapLong
                 (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
                                       (TC_MASK & (c - TC_UNIT)) |
                                       (SP_MASK & c))));
     //入果workQueue不为空  将其中的task取消                              
    if (w != null) {
        w.qlock = -1;                             // ensure set
        w.transferStealCount(this);
        w.cancelAll();                            // cancel remaining tasks
    }
    //死循环 如果为停止状态则配合停止
    for (;;) {                                    // possibly replace
        WorkQueue[] ws; int m, sp;
        if (tryTerminate(false, false) || w == null || w.array == null ||
            (runState & STOP) != 0 || (ws = workQueues) == null ||
            (m = ws.length - 1) < 0)              // already terminating
            break;
        if ((sp = (int)(c = ctl)) != 0) {         // wake up replacement
            if (tryRelease(c, ws[sp & m], AC_UNIT))
                break;
        }
        else if (ex != null && (c & ADD_WORKER) != 0L) {
            tryAddWorker(c);                      // create replacement
            break;
        }
        else                                      // don't need replacement
            break;
    }
    //异常处理
    if (ex == null)                               // help clean on way out
        ForkJoinTask.helpExpungeStaleExceptions();
    else                                          // rethrow
        ForkJoinTask.rethrow(ex);
}

至此,我们通过外部线程push之后,将任务提交到submissionQueue队列之后,会根据并行度以及工作线程的需要创建workQueue。唤醒工作线程进行窃取的操作就完成了,外部线程的调用就结束了,会回到main方法中去等待结果。

5.5.8 外部线程提交过程总结

上述代码调用的过程,我们可以形象的用如下图进行表示:
最开始,workQueues是null状态。在第一次执行的时候,externalSubmit方法中会初始化这个数组。


在这之后,还是在externalSubmit方法的for循环中,完成对任务队列的创建,将任务队列创建在偶数索引处。之后将任务写入这个队列:


此后,任务添加完成,但是没有工作队列进行工作。因此在这之后调用signalWork,通知工作队列开始干活。但是在这个方法执行的过程中,由于工作队列并不存在,没有worker,所以调用tryAddWorker开始创建worker。在createWorker会创建一个worker线程:


但是workQueue还没创建。这是在newthread之后,通过registerWorker创建的,在registerWorker方法中,会在奇数位创建一个workQueue,并将此前创建的线程与之绑定。这样一个worker就成功创建了。


这样就完成了worker创建的全过程。

5.6 workQueue的工作过程(任务执行)

在workQueue创建完成之后,下一步,这些线程的run方法调用后被启动。之后就进入了worker线程的生命周期了。实际上run方方法如下:

public void run() {
    if (workQueue.array == null) { // only run once
        Throwable exception = null;
        try {
            onStart();
            pool.runWorker(workQueue);
        } catch (Throwable ex) {
            exception = ex;
        } finally {
            try {
                onTermination(exception);
            } catch (Throwable ex) {
                if (exception == null)
                    exception = ex;
            } finally {
                pool.deregisterWorker(this, exception);
            }
        }
    }
}

重点执行的式runWorker,此时一旦出错,可以调用deregisterWorker方法进行清理。下面来看看runWorker的详细过程。

5.6.1 runWorker

这是worker工作线程的执行方法。通过死循环,不断scan是否有任务,之后窃取这个任务进行执行。

/**
 * 通过调用线程的run方法,此时开始最外层的runWorker
 */
final void runWorker(WorkQueue w) {
   //初始化队列,这个方法会根据任务进行判断是否需要扩容
    w.growArray();                   // allocate queue
    //hint是采用的魔数的方式增加
    int seed = w.hint;               // initially holds randomization hint
    //如果seed为0 则使用1
    int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
    //死循环
    for (ForkJoinTask<?> t;;) {
       //调用scan方法 对经过魔数计算的r 之后开始进行窃取过程 如果能够窃取 则task不为空
        if ((t = scan(w, r)) != null)
            //运行窃取之后的task
            w.runTask(t);
        //反之则当前线程进行等待
        else if (!awaitWork(w, r))
            break;
        r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
    }
}

通过scan方法从其他队列获得任务。

5.6.2 scan

/**
 * 通过scan方法进行任务窃取,扫描从一个随机位置开始,如果出现竞争则通过魔数继续随机移动,反之则线性移动,直到所有队列上的相同校验连续两次出现为空,则说明没有任何任务可以窃取,因此worker会停止窃取,之后重新扫描,如果找到任务则重新激活,否则返回null,扫描工作应该尽可能少的占用内存,以减少对其他扫描线程的干扰。
 *
 * @param w the worker (via its WorkQueue)
 * @param r a random seed
 * @return a task, or null if none found
 */
private ForkJoinTask<?> scan(WorkQueue w, int r) {
    WorkQueue[] ws; int m;
    //如果workQueues不为空且长度大于1,当前workQueue不为空
    if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
        //ss为扫描状态
        int ss = w.scanState;                     // initially non-negative
        //for循环 这是个死循环  origin将r与m求并,将多余位去除。然后赋值给k
        for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
            WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
            int b, n; long c;
            //如果k处不为空
            if ((q = ws[k]) != null) {
                //如果task大于0
                if ((n = (b = q.base) - q.top) < 0 &&
                    (a = q.array) != null) {      // non-empty
                    //计算i
                    long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                    //得到i处的task 
                    if ((t = ((ForkJoinTask<?>)
                              U.getObjectVolatile(a, i))) != null &&
                        q.base == b) {
                        //如果扫描状态大于0
                        if (ss >= 0) {
                            //更改a中i的值为空 也就是此处将任务窃取走了
                            if (U.compareAndSwapObject(a, i, t, null)) {
                                //将底部的指针加1
                                q.base = b + 1;
                                //如果n小于-1 则通知工作线程工作
                                if (n < -1)       // signal others
                                    signalWork(ws, q);
                                //将窃取的task返回
                                return t;
                            }
                        }
                        //如果 scan状态小于0 则调用tryRelease方法唤醒哪些wait的worker
                        else if (oldSum == 0 &&   // try to activate
                                 w.scanState < 0)
                            //调用tryRelease方法 后续详细介绍
                            tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                    }
                    //如果ss小于0 
                    if (ss < 0)                   // refresh
                        //更改ss
                        ss = w.scanState;
                    r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                    origin = k = r & m;           // move and rescan
                    oldSum = checkSum = 0;
                    continue;
                }
                checkSum += b;
            }
            //此处判断k,k在此通过+1的方式完成对原有workQueues的遍历
            if ((k = (k + 1) & m) == origin) {    // continue until stable
                if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                    oldSum == (oldSum = checkSum)) {
                    if (ss < 0 || w.qlock < 0)    // already inactive
                        break;
                    int ns = ss | INACTIVE;       // try to inactivate
                    long nc = ((SP_MASK & ns) |
                               (UC_MASK & ((c = ctl) - AC_UNIT)));
                    w.stackPred = (int)c;         // hold prev stack top
                    U.putInt(w, QSCANSTATE, ns);
                    if (U.compareAndSwapLong(this, CTL, c, nc))
                        ss = ns;
                    else
                        w.scanState = ss;         // back out
                }
                checkSum = 0;
            }
        }
    }
    return null;
}

此方法最大的特点就是,根据随机数计算一个k,然后根据k去遍历workQueues,之后看看这个位置是否有数据,如果不为空,则检查checkSum,根据checkSum的状态缺认是否从这个队列中取数据。按之前约定的FIFO或者LIFO取数。这意味着,工作队列对窃取和是否获得本队列中的任务之间并没有优先级,而是根据随机数得到的index,之后对数组进行遍历。

扫描整个队列连续出现两次扫描的checkSum的值相同,说明所有的队列都是空的了 需要去灭活当前的队列。因为两次checkSum的值相同说明两次都便利了所有的队列的base 也就是都是线性的增加k的值,如果有的队列有元素发生竞争失败了会随机移动下标, 很大概率不会形成两次一样checkSum的。

如果scan没有扫描到任务会将这个队列失活,并放入将队列的scanState字段方法ctl的低32位,替换原来的值并将原来的值放入当前队列的stackPred字段构成一个栈。scan没有扫描到任务返回后,runWork方法会调用awaitWork方法阻塞线程。

5.6.3 tryRelease

对于执行完成的worker,则需要进行释放。

/**
 * 如果worker处于空闲worker Stack的workQueue的顶部。则发信号对其进行释放。
 *
 * @param c incoming ctl value
 * @param v if non-null, a worker
 * @param inc the increment to active count (zero when compensating)
 * @return true if successful
 */
private boolean tryRelease(long c, WorkQueue v, long inc) {
    //sp取c的低位,计算vs
    int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
    //如果v不为空 且v的sancState为sp 
    if (v != null && v.scanState == sp) {          // v is at top of stack
        //计算nc
        long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
        //采用cas的方式 将ctl改为nc
        if (U.compareAndSwapLong(this, CTL, c, nc)) {
            //修改scanState的状态
            v.scanState = vs;
            //如果此时这线程为park状态,则调用unpark
            if ((p = v.parker) != null)
                U.unpark(p);
            return true;
        }
    }
    return false;
}

5.6.4 awaitWork

如果上述scan不到任务呢?也就是说,scan方法没有拿到task,则会调用awaitWork。将当前的线程进行阻塞。

/**
 *如果不能窃取到任务,那么就将worker阻塞。如果停用导致线程池处于静止状态,则检查是否要关闭,如果这不是唯一的工作线程,则等待给定的持续时间,达到超时时间后,如果ctl没有更改,则将这个worker终止,之后唤醒另外一个其他的worker对这个过程进行重复。
 *
 * @param w the calling worker
 * @param r a random seed (for spins)
 * @return false if the worker should terminate
 */
private boolean awaitWork(WorkQueue w, int r) {
    //如果w不为空切w的qlock小于0 则直接返回false 
    if (w == null || w.qlock < 0)                 // w is terminating
        return false;
    //for循环,这是个死循环,定义pred
    for (int pred = w.stackPred, spins = SPINS, ss;;) {
        //如果ss大于0 则返回
        if ((ss = w.scanState) >= 0)
            break;
        //如果spins大于0 
        else if (spins > 0) {
           //计算r
            r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
            //如果r大于0 且spins为0
            if (r >= 0 && --spins == 0) {         // randomize spins
                WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
                //如果pred不为0 且ws不为空
                if (pred != 0 && (ws = workQueues) != null &&
                     //j<ws.length
                    (j = pred & SMASK) < ws.length &&
                    //j位置处不为空
                    (v = ws[j]) != null &&        // see if pred parking
                     //并且没有park
                    (v.parker == null || v.scanState >= 0))
                    //继续在for循环中自旋
                    spins = SPINS;                // continue spinning
            }
        }
        //如果qlock小于0  返回false
        else if (w.qlock < 0)                     // recheck after spins
            return false;
        //如果被中断
        else if (!Thread.interrupted()) {
            long c, prevctl, parkTime, deadline;
            int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
            //如果线程池停止
            if ((ac <= 0 && tryTerminate(false, false)) ||
                (runState & STOP) != 0)           // pool terminating
                return false;
            //最后的等待 获取不到任务 此时采用超时等待  park的方式进行
            if (ac <= 0 && ss == (int)c) {        // is last waiter
                prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
                int t = (short)(c >>> TC_SHIFT);  // shrink excess spares
                if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
                    return false;                 // else use timed wait
                parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
                deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
            }
            else
                prevctl = parkTime = deadline = 0L;
            //获取线程
            Thread wt = Thread.currentThread();
            //设置PARKBLOCKER
            U.putObject(wt, PARKBLOCKER, this);   // emulate LockSupport
            w.parker = wt;
            //调用park方法
            if (w.scanState < 0 && ctl == c)      // recheck before park
                U.park(false, parkTime);
            U.putOrderedObject(w, QPARKER, null);
            U.putObject(wt, PARKBLOCKER, null);
            //如果scanState大于0 退出
            if (w.scanState >= 0)
                break;
            //如果已经达到deadline的时间 则返回false
            if (parkTime != 0L && ctl == c &&
                deadline - System.nanoTime() <= 0L &&
                U.compareAndSwapLong(this, CTL, c, prevctl))
                return false;                     // shrink pool
        }
    }
    return true;
}

上述这些方法,如果返回了false,则会导致外层的阻塞不能阻塞住,在runWorker方法中,会退出循环,这个线程就会退出。只有返回了true,那么这个线程就会一直运行,获得任务,循环。

5.6.5 workQueue的执行过程总结

当workQueue上的thread启动之后,这个线程就会调用runWorker方法。之后再runWorker方法中有一个死循环,不断的通过scan方法去扫描任务,实际上就是执行窃取过程。如下图所示,这样通过遍历外层workQueues的方式将会从任务队列中窃取任务进行执行。


当执行之后,会通过fork产生新的任务,将这些任务任务添加到工作队列中去。其他线程继续scan,执行。这个过程不断循环,直到任务全部都执行完成,这样就完成了整个计算过程。

5.7 join

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);//可能被取消或发生异常
    return getRawResult();
}
 
 
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        externalAwaitDone();
    //是否已执行完
        //是 直接返回任务状态
        //否 当前线程是否是ForkJoinWorkerThread
                //是 执行workQueue的tryUnPush方法和doExec方法。这里的意思是移除在top的当前任务,然后自己主动执行
                    //移除成功 返回任务状态
                    //移除失败 调用awaitJoin方法
                //否 执行externalAwaitDone
}
 

final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
    int s = 0;
    if (task != null && w != null) {
        //将当前任务变为队列的正在join的任务,先前的放到task的pervJoin,形成一个栈。
        ForkJoinTask<?> prevJoin = w.currentJoin;
        U.putOrderedObject(w, QCURRENTJOIN, task);
        CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
            (CountedCompleter<?>)task : null;
        for (;;) {
            //小于0说明完成
            if ((s = task.status) < 0)
                break;
            //CountedCompleter任务由helpComplete来完成join
            if (cc != null)
                helpComplete(w, cc, 0);
            //如果队列为空或 执行任务没有成功则会去帮助偷窃.
                    //执行失败说明任务被偷了。
            //tryRemoveAndExec任务执行成功则会返回false
            //在当前队列任务执行完了或者
                // 或者(在队列中没有找到这个任务且任务没有执行)
            //则这个任务是被偷了,偷窃任务的可能是在join。
            //所以去帮助偷窃者执行他的任务。
            else if (w.base == w.top || w.tryRemoveAndExec(task))
                helpStealer(w, task);
            //如果任务执行成功则会跳出循环
            if ((s = task.status) < 0)
                break;
            long ms, ns;
            if (deadline == 0L)
                ms = 0L;
            else if ((ns = deadline - System.nanoTime()) <= 0L)
                break;
            else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                ms = 1L;
            //尝试补偿,在里面有进行
            if (tryCompensate(w)) {
                //等待
                task.internalWait(ms);
                //活跃线程加1
                U.getAndAddLong(this, CTL, AC_UNIT);
            }
        }
        //还原当前队列正在join的任务
        U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
    }
    //返回任务的状态
    return s;
}

join的task会形成一个栈。每个task都会执行fork,每次fork都会往自己的workqueue的task数组把自己方法进去。

然后在执行join的时候又会把自己弹出来 tryUnpush(this) 。依次执行并形成join栈。这样当任务拆解到足够小的时候,执行join就会返回了,这样它之前的任务也会返回了。

参考:
https://blog.csdn.net/dhaibo1986/article/details/108782916

https://www.cnblogs.com/juniorMa/articles/14241296.html

https://www.cnblogs.com/juniorMa/articles/14241534.html

https://www.cnblogs.com/juniorMa/articles/14242659.html

https://blog.csdn.net/tyrroo/article/details/81483608

https://www.cnblogs.com/yangchen-geek/p/15506272.html

https://blog.csdn.net/qq_40996741/article/details/107147092

相关文章

网友评论

      本文标题:Java并发编程——ForkJoinPool之外部提交及work

      本文链接:https://www.haomeiwen.com/subject/yypsortx.html