其实标题问题的答案就在该类的注释之中,如下:
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use
* ReentrantLock because we do not want worker tasks to be able to
* reacquire the lock when they invoke pool control methods like
* setCorePoolSize. Additionally, to suppress interrupts until
* the thread actually starts running tasks, we initialize lock
* state to a negative value, and clear it upon start (in
* runWorker).
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
为了方便阅读,翻译如下:
①
Worker
类主要是维护线程运行任务时的中断控制状态,以及次要的信息记录。② 该类通过继承AbstractQueuedSynchronizer
来简化每个任务执行周围锁的获取和释放。③ 这可以保障中断仅会唤醒等待任务的工作线程,而非中断正在运行的任务。④ 我们实现了一个简单的非可重入互斥锁,而不是使用ReentrantLock
,因为我们不希望工作线程在调用诸如setCorePoolSize
之类的线程池控制方法时能够重新获取该锁。⑤ 此外,为了抑制线程真正开始运行之前的中断,我们将锁的状态值初始化为负值,并在启动时将其清除(runWorker
方法中)。
总的来说,依旧不是很好理解,所以我们一句一句来看。
① Worker 类主要是维护线程运行任务时的中断控制状态,以及次要的信息记录。
这里的中断控制状态,是指其继承类 AbstractQueuedSynchronizer
中的 state
字段,该字段即用作内部加锁,又用做中断控制。
信息记录则是指任务完成数计数器 completedTasks
字段。
② 该类通过继承 AbstractQueuedSynchronizer
来简化每个任务执行周围锁的获取和释放。
这句比较好理解,在覆写父类的 tryAcquire
和 tryRelease
方法基础上,添加自定义的 lock
及 unlock
等方法来提供锁的获取和释放,具体请见源码。
③ 这可以保障中断仅会唤醒等待任务的工作线程,而非中断正在运行的任务。
这里我们需要看一下 interruptIdleWorkers
方法,该方法的作用就是中断等待任务的工作线程,代码如下:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (ThreadPoolExecutor.Worker w : workers) {
Thread t = w.thread;
// 加锁并中断
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
需要关心的是加锁逻辑 if (!t.isInterrupted() && w.tryLock())
,tryLock
方法实际调用的覆写的 tryAcquire
方法,该方法则尝试将 state
值从 0
修改为 1
,由于 state
值只有在等待任务时才为 0
,而线程运行时该值为 1
,所以只有等待任务的线程才能加锁成功,并执行之后的中断操作。
④ 我们实现了一个简单的非可重入互斥锁,而不是使用 ReentrantLock,因为我们不希望工作线程在调用诸如 setCorePoolSize 之类的线程池控制方法时能够重新获取该锁。
前半句很好理解,就是需要一个非可重入的互斥锁,而 ReentrantLock
是可重入的互斥锁,所以不用。后半句则是说防止 setCorePoolSize
方法再次获取该锁,我们先看该方法的代码:
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// 省略...
}
}
可以看到该方法会调用上面说到的 interruptIdleWorkers
方法,interruptIdleWorkers
方法会遍历所有的工作线程,尝试加锁成功后执行中断操作,如果执行 setCorePoolSize
方法的是线程池中某个工作线程,在锁可重入的情况下,该工作线程会在之后的 w.tryLock
成功获取锁,并中断自己。为了避免这种情况,就需要有一个非可重入的互斥锁,继承 AbstractQueuedSynchronizer
之后,可以简单快速的实现。
⑤ 此外,为了抑制线程真正开始运行之前的中断,我们将锁的状态值初始化为负值,并在启动时将其清除(runWorker 方法中)
工作线程初始化如下,会将状态字段 state
的值设置为 -1
,这样上面的 w.tryLock
就无法获取锁,也就不能执行中断。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
// 省略...
}
清除该状态是在 runWorker
方法中,看代码:
final void runWorker(ThreadPoolExecutor.Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 省略...
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
可以看到方法一开始就会执行 w.unlock
方法,该方法会将 state
值设置为 0
,也就允许被中断了。
总结
基于上面的分析可知,继承 AbstractQueuedSynchronizer
主要作用如下:
- 需要状态值控制中断
- 支持非可重入的互斥锁
网友评论