1.ForkJoinPool
1.1 构造器及域
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
/*
* Bits and masks for field ctl, packed with 4 16 bit subfields:
* AC: Number of active running workers minus target parallelism
* TC: Number of total workers minus target parallelism
* SS: version count and status of top waiting thread
* ID: poolIndex of top of Treiber stack of waiters
*
* When convenient, we can extract the lower 32 stack top bits
* (including version bits) as sp=(int)ctl. The offsets of counts
* by the target parallelism and the positionings of fields makes
* it possible to perform the most common checks via sign tests of
* fields: When ac is negative, there are not enough active
* workers, when tc is negative, there are not enough total
* workers. When sp is non-zero, there are waiting workers. To
* deal with possibly negative fields, we use casts in and out of
* "short" and/or signed shifts to maintain signedness.
*
* Because it occupies uppermost bits, we can add one active count
* using getAndAddLong of AC_UNIT, rather than CAS, when returning
* from a blocked join. Other updates entail multiple subfields
* and masking, requiring CAS.
*/
// Lower and upper word masks
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;
// Active counts
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT;
private static final long AC_MASK = 0xffffL << AC_SHIFT;
// Total counts
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT;
private static final long TC_MASK = 0xffffL << TC_SHIFT;
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
// runState bits: SHUTDOWN must be negative, others arbitrary powers of two
private static final int RSLOCK = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;
// Instance fields
volatile long ctl; // main pool control
volatile int runState; // lockable status
final int config; // parallelism, mode
int indexSeed; // to generate worker index
volatile WorkQueue[] workQueues; // main registry
final ForkJoinWorkerThreadFactory factory;
final UncaughtExceptionHandler ueh; // per-worker UEH
final String workerNamePrefix; // to create worker name string
volatile AtomicLong stealCounter; // also used as sync monitor
ctl打包了四个功能子域,都是16位:
- AC——活跃的工作线程数 - 目标并行度
- TC——总线程数 - 目标并行度
- SS——栈顶等待线程的版本计数和状态
- ID——等待线程Treiber栈顶的poolIndex
方便时,可以将低32位栈顶位(包括版本位)提取为sp =(int)ctl。 目标并行度和字段位置的计数偏移使得可以通过字段的符号(正负号)测试执行最常见的检查:当ac为负时,没有足够的活跃工作线程,当tc为负时,工作线程总数不够。 当sp非零时,有等待线程。 为了处理可能的负字段,使用“short”和/或有符号移位来维持符号位。
因为AC占据最高16位,所以当从阻塞join返回时,可以使用getAndAddLong和AC_UNIT增加一个活动计数而不是CAS。 其他更新需要多个子字段和掩码,需要CAS。
2.execute
2.1 外部提交任务
- execute异步执行,提交后就不管了
- invoke要等着返回结果
- submit返回task,可以进行控制:比如获取、取消等操作
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = new ForkJoinTask.RunnableExecuteAction(task);
externalPush(job);
}
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
核心是externalPush:
/**
* Tries to add the given task to a submission queue at
* submitter's current queue. Only the (vastly) most common path
* is directly handled in this method, while screening for need
* for externalSubmit.
*
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
externalSubmit(task);
}
- 使用ThreadLocalRandom:
Random,在并发下,多个线程同时计算种子需要用到同一个原子变量。由于更新操作使用CAS,同时执行只有一个线程成功,其他线程的大量自旋造成性能损失,ThreadLocalRandom继承Random,对此进行了改进。
hreadLocalRandom运用了ThreadLocal,每个线程内部维护一个种子变量。 - 外部提交的task,放在偶数位下标的队列上,SQMASK = 0x007e = 1111110,任何数和 SQMASK 进行 & 运算 都会是偶数
- qlock是WorkQueue的域,1表示加锁,< 0表示终止,其他情况为0。对WorkQueue进行操作前,先要获取其qlock锁。放入队列后解锁。
- 将任务task放到到队列q的top位置,也即压入栈顶。然后top加一。
- 当队列之前只有一个任务,现在又加入一个任务后,调用signalWork(ws, q):如果活动线程太少,会创建或者激活一个工作线程。
externalPush和externalSubmit:
- externalSubmit是完整版的externalPush,在任务首次提交时,externalSubmit初始化workQueues及其他相关属性;后续再向池中提交的任务都是通过externalPush来完成。
/**
* Full version of externalPush, handling uncommon cases, as well
* as performing secondary initialization upon the first
* submission of the first task to the pool. It also detects
* first submission by an external thread and creates a new shared
* queue if the one at index if empty or contended.
*
* @param task the task. Caller must ensure non-null.
*/
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();
try {
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
else if ((q = ws[k = r & m & SQMASK]) != null) {
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
submitted = true;
}
} finally {
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
if (submitted) {
signalWork(ws, q);
return;
}
}
move = true; // move on failure
}
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
q.hint = r;
q.config = k | SHARED_QUEUE;
q.scanState = INACTIVE;
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
- 初始化调用线程的探针值,用于计算WorkQueue索引
- 如果池已关闭,则帮助终止
- 如果workQueues数组并未初始化,则进行初始化
初始化过程需要加锁:lockRunState()
初始化stealCounter
创建workQueues数组,容量为2的幂
然后更改runState,赋值为STARTED - 数组已初始化,并且获取的偶数槽不为空,则放到对应的WorkQueue中去,与externalPush相同(使用WorkQueue的qlock锁)
- 此时表明偶数槽为空,则创建该槽,并放到workQueues中去
关于lockRunState的作用:
/**
* Acquires the runState lock; returns current (locked) runState.
*/
private int lockRunState() {
int rs;
return ((((rs = runState) & RSLOCK) != 0 ||
!U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
awaitRunStateLock() : rs);
}
如果runState已经加锁,或者CAS加锁失败,则进入awaitRunStateLock()。
/**
* Spins and/or blocks until runstate lock is available. See
* above for explanation.
*/
private int awaitRunStateLock() {
Object lock;
boolean wasInterrupted = false;
for (int spins = SPINS, r = 0, rs, ns;;) {
if (((rs = runState) & RSLOCK) == 0) {
if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
if (wasInterrupted) {
try {
Thread.currentThread().interrupt();
} catch (SecurityException ignore) {
}
}
return ns;
}
}
else if (r == 0)
r = ThreadLocalRandom.nextSecondarySeed();
else if (spins > 0) {
r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
if (r >= 0)
--spins;
}
else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
Thread.yield(); // initialization race
else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
synchronized (lock) {
if ((runState & RSIGNAL) != 0) {
try {
lock.wait();
} catch (InterruptedException ie) {
if (!(Thread.currentThread() instanceof
ForkJoinWorkerThread))
wasInterrupted = true;
}
}
else
lock.notifyAll();
}
}
}
}
这是经典的自旋+yield+阻塞组合的方法。
- 先尝试加锁,成功则返回
- 自旋+xorshift
- 如果获得锁的线程还未初始化好,则yield
- 设置runState为RSIGNAL,并用stealCounter进行加锁,加锁成功后,lock.wait(),使用wait/notifyAll模式
- unlockRunState会进行唤醒lock.notifyAll();
/**
* Tries to create or activate a worker if too few are active.
*
* @param ws the worker array to use to find signallees
* @param q a WorkQueue --if non-null, don't retry if now empty
*/
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
while ((c = ctl) < 0L) { // too few active
if ((sp = (int)c) == 0) { // no idle workers
if ((c & ADD_WORKER) != 0L) // too few workers
tryAddWorker(c);
break;
}
if (ws == null) // unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
}
- 如果活动线程太少,且没有空闲线程,且总线程数也太少,则tryAddWorker
只需要判断AC和TC的符号位,如果为1,则表示太少。
(c = ctl) < 0L表示活动线程太少
(c & ADD_WORKER) != 0L表示总线程数太少
sp如果非0,表示有空闲线程,如果为0,则表示没有空闲线程。 - vs = (sp + SS_SEQ) & ~INACTIVE加上版本戳SS_SEQ避免ABA问题;
将活跃线程数加1,并更新ctl
如果成功,更新栈顶的scanState(加了一个版本戳SS_SEQ)
唤醒阻塞的线程
2.2 内部提交任务
3.WorkQueue
// Constants shared across ForkJoinPool and WorkQueue
// Bounds
static final int SMASK = 0xffff; // short bits == max index
static final int MAX_CAP = 0x7fff; // max #workers - 1
static final int EVENMASK = 0xfffe; // even short bits
static final int SQMASK = 0x007e; // max 64 (even) slots
// Masks and units for WorkQueue.scanState and ctl sp subfield
static final int SCANNING = 1; // false when running tasks
static final int INACTIVE = 1 << 31; // must be negative
static final int SS_SEQ = 1 << 16; // version count
// Mode bits for ForkJoinPool.config and WorkQueue.config
static final int MODE_MASK = 0xffff << 16; // top half of int
static final int LIFO_QUEUE = 0;
static final int FIFO_QUEUE = 1 << 16;
static final int SHARED_QUEUE = 1 << 31; // must be negative
/**
* Queues supporting work-stealing as well as external task
* submission. See above for descriptions and algorithms.
* Performance on most platforms is very sensitive to placement of
* instances of both WorkQueues and their arrays -- we absolutely
* do not want multiple WorkQueue instances or multiple queue
* arrays sharing cache lines. The @Contended annotation alerts
* JVMs to try to keep instances apart.
*/
@sun.misc.Contended
static final class WorkQueue {
/**
* Capacity of work-stealing queue array upon initialization.
* Must be a power of two; at least 4, but should be larger to
* reduce or eliminate cacheline sharing among queues.
* Currently, it is much larger, as a partial workaround for
* the fact that JVMs often place arrays in locations that
* share GC bookkeeping (especially cardmarks) such that
* per-write accesses encounter serious memory contention.
*/
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
/**
* Maximum size for queue arrays. Must be a power of two less
* than or equal to 1 << (31 - width of array entry) to ensure
* lack of wraparound of index calculations, but defined to a
* value a bit less than this to help users trap runaway
* programs before saturating systems.
*/
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// Instance fields
volatile int scanState; // versioned, <0: inactive; odd:scanning
int stackPred; // pool stack (ctl) predecessor
int nsteals; // number of steals
int hint; // randomization and stealer index hint
int config; // pool index and mode
volatile int qlock; // 1: locked, < 0: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // the elements (initially unallocated)
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared
volatile Thread parker; // == owner during call to park; else null
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
支持工作窃取和外部任务提交的队列,在大多数平台上性能跟在WorkQueues和它们的数组中放入实例的性能息息相关。因此不希望多个WorkQueue和多个数组共享缓存行,@Contended注释让JVMs尽量保持实例之间的距离。
scanState:工作线程(worker)和线程池(pool)都使用了 scanState,通过 scanState 可以管理和追踪工作线程是否为 INACTIVE(可能正在阻塞等待唤醒)状态,也可以判断任务是否为 SCANNING 状态(当两者都不是时,它就是正在运行的任务)。当一个工作线程处于灭活状态(INACTIVE),它的scanState被设置为禁止执行任务,但是即便如此它也必须扫描一次以避免队列争用。注意,scanState 的更新在队列CAS释放之后(会有延迟)。在工作线程入队后,scanState的低16位必须持有它在池中的索引,所以我们在初始化时就将索引设置好(参考 registerWorker 方法),并将其一直保存在那里或在必要时恢复它。
特殊形式的Deques只支持四种可能的端操作中的三种 - push,pop和poll(也称为steal),进一步的限制是push和pop仅从拥有的线程调用,而poll可以从其他线程调用。
3.1 push
/**
* Pushes a task. Call only by owner in unshared queues. (The
* shared-queue version is embedded in method externalPush.)
*
* @param task the task. Caller must ensure non-null.
* @throws RejectedExecutionException if array cannot be resized
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
非共享队列的拥有者调用,共享队列调用externalPush。
- 放到队列中,并将top+1,如果原来只有一个元素,成功加入一个后,会调用signalWork(有空闲线程则唤醒,如果活动线程太少,会创建一个新线程。)
- 如果空间不够大,扩容为两倍
3.2 pop
/**
* Takes next task, if one exists, in LIFO order. Call only
* by owner in unshared queues.
*/
final ForkJoinTask<?> pop() {
ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
if ((a = array) != null && (m = a.length - 1) >= 0) {
for (int s; (s = top - 1) - base >= 0;) {
long j = ((m & s) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
break;
if (U.compareAndSwapObject(a, j, t, null)) {
U.putOrderedInt(this, QTOP, s);
return t;
}
}
}
return null;
}
从top弹出一个任务,并更新top减一。
3.3 poll
/**
* Takes next task, if one exists, in FIFO order.
*/
final ForkJoinTask<?> poll() {
ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
while ((b = base) - top < 0 && (a = array) != null) {
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
if (base == b) {
if (t != null) {
if (U.compareAndSwapObject(a, j, t, null)) {
base = b + 1;
return t;
}
}
else if (b + 1 == top) // now empty
break;
}
}
return null;
}
从base开始取。
4.ForkJoinWorkerThread
/**
* Tries to add one worker, incrementing ctl counts before doing
* so, relying on createWorker to back out on failure.
*
* @param c incoming ctl value, with total count negative and no
* idle workers. On CAS failure, c is refreshed and retried if
* this holds (otherwise, a new worker is not needed).
*/
private void tryAddWorker(long c) {
boolean add = false;
do {
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
if (ctl == c) {
int rs, stop; // check if terminating
if ((stop = (rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
if (stop != 0)
break;
if (add) {
createWorker();
break;
}
}
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
/**
* Tries to construct and start one worker. Assumes that total
* count has already been incremented as a reservation. Invokes
* deregisterWorker on any failure.
*
* @return true if successful
*/
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
if (fac != null && (wt = fac.newThread(this)) != null) {
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
deregisterWorker(wt, ex);
return false;
}
4.1 构造器
public class ForkJoinWorkerThread extends Thread {
/*
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform
* ForkJoinTasks. For explanation, see the internal documentation
* of class ForkJoinPool.
*
* This class just maintains links to its pool and WorkQueue. The
* pool field is set immediately upon construction, but the
* workQueue field is not set until a call to registerWorker
* completes. This leads to a visibility race, that is tolerated
* by requiring that the workQueue field is only accessed by the
* owning thread.
*
* Support for (non-public) subclass InnocuousForkJoinWorkerThread
* requires that we break quite a lot of encapsulation (via Unsafe)
* both here and in the subclass to access and set Thread fields.
*/
final ForkJoinPool pool; // the pool this thread works in
final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
/**
* Creates a ForkJoinWorkerThread operating in the given pool.
*
* @param pool the pool this thread works in
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
this.workQueue = pool.registerWorker(this);
}
/**
* Callback from ForkJoinWorkerThread constructor to establish and
* record its WorkQueue.
*
* @param wt the worker thread
* @return the worker's queue
*/
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
wt.setDaemon(true); // configure thread
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
WorkQueue w = new WorkQueue(this, wt);
int i = 0; // assign a pool index
int mode = config & MODE_MASK;
int rs = lockRunState();
try {
WorkQueue[] ws; int n; // skip if no array
if ((ws = workQueues) != null && (n = ws.length) > 0) {
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
i = ((s << 1) | 1) & m; // odd-numbered indices
if (ws[i] != null) { // collision
int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
w.hint = s; // use as random seed
w.config = i | mode;
w.scanState = i; // publication fence
ws[i] = w;
}
} finally {
unlockRunState(rs, rs & ~RSLOCK);
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
- step1.设置为守护线程
- step2.为当前线程新建WorkQueue。
- step3.在workQueues中找到为空的奇数索引位置,设置WorkQueue的属性(config低16位为workQueues中的索引,高16位为mode),将WorkQueue放到workQueues中去。
4.2 run
/**
* This method is required to be public, but should never be
* called explicitly. It performs the main run loop to execute
* {@link ForkJoinTask}s.
*/
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
/**
* Top-level runloop for workers, called by ForkJoinWorkerThread.run.
*/
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t;;) {
if ((t = scan(w, r)) != null)
w.runTask(t);
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
- step1.scan方法扫描获取任务,如果为扫描到任务,则调用awaitWork等待,直到工作线程/线程池终止或等待超时
- step2.调用WorkQueue .runTask运行任务
/**
* Scans for and tries to steal a top-level task. Scans start at a
* random location, randomly moving on apparent contention,
* otherwise continuing linearly until reaching two consecutive
* empty passes over all queues with the same checksum (summing
* each base index of each queue, that moves on each steal), at
* which point the worker tries to inactivate and then re-scans,
* attempting to re-activate (itself or some other worker) if
* finding a task; otherwise returning null to await work. Scans
* otherwise touch as little memory as possible, to reduce
* disruption on other scanning threads.
*
* @param w the worker (via its WorkQueue)
* @param r a random seed
* @return a task, or null if none found
*/
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
if ((q = ws[k]) != null) {
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
if (ss >= 0) {
if (U.compareAndSwapObject(a, i, t, null)) {
q.base = b + 1;
if (n < -1) // signal others
signalWork(ws, q);
return t;
}
}
else if (oldSum == 0 && // try to activate
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
if (ss < 0) // refresh
ss = w.scanState;
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
if ((k = (k + 1) & m) == origin) { // continue until stable
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;
}
}
}
return null;
}
扫描并试图窃取top任务。 扫描从一个随机位置开始,发生明显争用时随机移动,否则线性地继续直到在所有队列上扫描两趟都为空且具有相同校验和(对每个队列的每个base索引求和,每次窃取时移动),此时工作线程尝试停用然后重新扫描,如果找到任务则尝试重新激活(本身或其他工作线程); 否则返回null以等待任务。 扫描否则会尽可能少地触及内存,以减少对其他扫描线程的干扰。
扫描并尝试偷取一个任务。也就是说并不一定会执行当前 WorkQueue 中的任务,而是可以偷取别的Worker的任务来执行。
函数的大概执行流程如下:
- 取随机位置的一个 WorkQueue;
- 获取base位的 ForkJoinTask,成功取到后更新base位并返回任务;如果取到的 WorkQueue 中任务数大于1,则调用signalWork创建或唤醒其他工作线程;
- 如果当前工作线程处于不活跃状态(INACTIVE),则调用tryRelease尝试唤醒栈顶工作线程来执行。tryRelease源码如下:
/**
* Executes the given task and any remaining local tasks.
*/
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}
- step1.标记scanState为正在执行状态
- step2.更新currentSteal为当前获取到的任务,并执行
- step3.调用execLocalTasks依次执行当前WorkerQueue中的任务
- step4.更新偷取的任务数
- step5.设置状态为SCANNING,然后该线程重复扫描过程。
/**
* Possibly blocks worker w waiting for a task to steal, or
* returns false if the worker should terminate. If inactivating
* w has caused the pool to become quiescent, checks for pool
* termination, and, so long as this is not the only worker, waits
* for up to a given duration. On timeout, if ctl has not
* changed, terminates the worker, which will in turn wake up
* another worker to possibly repeat this process.
*
* @param w the calling worker
* @param r a random seed (for spins)
* @return false if the worker should terminate
*/
private boolean awaitWork(WorkQueue w, int r) {
if (w == null || w.qlock < 0) // w is terminating
return false;
for (int pred = w.stackPred, spins = SPINS, ss;;) {
if ((ss = w.scanState) >= 0)
break;
else if (spins > 0) {
r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
if (r >= 0 && --spins == 0) { // randomize spins
WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
if (pred != 0 && (ws = workQueues) != null &&
(j = pred & SMASK) < ws.length &&
(v = ws[j]) != null && // see if pred parking
(v.parker == null || v.scanState >= 0))
spins = SPINS; // continue spinning
}
}
else if (w.qlock < 0) // recheck after spins
return false;
else if (!Thread.interrupted()) {
long c, prevctl, parkTime, deadline;
int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
if ((ac <= 0 && tryTerminate(false, false)) ||
(runState & STOP) != 0) // pool terminating
return false;
if (ac <= 0 && ss == (int)c) { // is last waiter
prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
int t = (short)(c >>> TC_SHIFT); // shrink excess spares
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // else use timed wait
parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
}
else
prevctl = parkTime = deadline = 0L;
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
w.parker = wt;
if (w.scanState < 0 && ctl == c) // recheck before park
U.park(false, parkTime);
U.putOrderedObject(w, QPARKER, null);
U.putObject(wt, PARKBLOCKER, null);
if (w.scanState >= 0)
break;
if (parkTime != 0L && ctl == c &&
deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // shrink pool
}
}
return true;
}
可能会阻止工作线程等待任务窃取,或者如果工作线程应该终止则返回false。 如果停用w导致池变为静止,则检查池终止,并且,只要这不是唯一的工作线程,等待直到给定的持续时间。 在超时时,如果ctl没有改变,则终止worker,这将唤醒另一个worker可能重复此过程。
等到过期或被唤醒后如果发现自己是scanning(scanState >= 0)状态,说明已经等到任务,跳出等待返回true继续 scan,否则的更新ctl并返回false。
5.总结
5.1 四大构件
-
线程池ForkJoinPool,核心的ctl打包了AC、TC、SS和ID,runState用于操作workQueues数组。
-
工作线程ForkJoinWorkerThread,每个工作线程都会注册一个自己的WorkQueue,该WorkQueue放在workQueues数组奇数索引处。线程执行时,不一定会执行当前 WorkQueue 中的任务,而是可以偷取别的Worker的任务来执行。
-
队列WorkQueue,支持工作窃取和外部任务提交的双端队列,只支持四种可能的端操作中的三种 - push,pop和poll(也称为steal),进一步的限制是push和pop仅从拥有的线程调用,而poll可以从其他线程调用。
-
任务ForkJoinTask。
1)任务类型
RecursiveAction——不返回结果的计算
RecursiveTask——返回结果
CountedCompleter——已完成的操作触发其他操作
2)内部提交
fork——直接加入到当前线程的workQueue中
invoke——提交任务等待任务完成并获取结果(当前线程执行)
join——等待任务完成并获取结果,尝试在当前线程中开始执行。
3)任务状态
当状态完成时,为负数,表示正常完成、取消或者异常;
阻塞等待的任务设置了SIGNAL
5.2 整体的运行机理
-
ForkJoinPool适合分治算法
-
任务提交分为外部和内部提交
-
整体运行图
网友评论