work 工作线程
通过上节我们将任务提交给work。并启动work线程。在我们看run方法之前我们看下work类的整体设计。
工作线程Worker的设计
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable {
-
work的本质是一个Runnable,而创建一个work线程是使用ThreadFactory创建一个Thread并将work做其属性new Tread(work),同时将这个thread设置的work属性中。多么巧妙的设计!!
-
work为什么需要AQS同步状态,因为同一个时间只能完成一个任务。因而需要在其开始工作和结束工作获取同步状态(加锁),释放同步状态(解锁)
当它的state属性为0时表示unlock,state为1时表示lock。任务执行时必须在lock状态的保护下,防止出现同步问题。因此当Worker处于lock状态时,表示它正在运行,当它处于unlock状态时,表示它“空闲”。当它空闲超过keepAliveTime时,就有可能被回收。
/**
* work使用AQS同步锁,用来判断当前work能否接收新任务
*
* 同步状态0,表示空闲 可以接收新任务
* 同步状态1,表示正在执行任务 无法接收新任务
*
* 获取同步状态将 同步状态设置为1 ,释放同步状态设置为0
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 工作线程*/
final Thread thread;
/** 初始化Worker,分配的第一个任务 */
Runnable firstTask;
/** 每个work执行的任务数量 */
volatile long completedTasks;
/**
* 实例化Worker
*/
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 工作线程执行,调用外部TheadPoolExecutor.runWorker方法 */
public void run() {
runWorker(this);
}
/**
* 判断当前Work是否空闲
*/
protected boolean isHeldExclusively() {
return getState() != 0;
}
/**
* tryAcquire 为AQS 尝试获取独占同步状态模板方法实现。
*/
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* tryRelease为AQS 尝试释放独占同步状态模板方法实现。
*/
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 获取独占同步状态
*/
public void lock() { acquire(1); }
/**
* 尝试获取同步状态
*/
public boolean tryLock() { return tryAcquire(1); }
/**
* 释放独占同步状态
*/
public void unlock() { release(1); }
/**
* 判断能够护球同步状态
*/
public boolean isLocked() { return isHeldExclusively(); }
/**
* 中断work正在执行任务
*/
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
工作线程Worker的执行
work执行的核心是有条件无限循环,只要getTask不为null,线程会一直运行。不断获取任务执行。
**
* work执行逻辑。
* 内部存在一个for循环,不断循环获取任务执行。当线程池状态还在运行,work线程会一直运行不会推出循环
* getTask()线程返回null时退出,一般可能当前work超时被销毁或线程池不在运行。
* @param w
*/
final void runWorker(Worker w) {
/** 获取当前线程 **/
Thread wt = Thread.currentThread();
/** 获取执行任务**/
Runnable task = w.firstTask;
/** 将任务从work清理**/
w.firstTask = null;
/** 初始化同步状态为0(创建时为-1) **/
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
/**
* 如果当前work中存在任务则执行,不存在则从WorkQueue获取任务
* getTask()!=null 时work永远不停止
* **/
while (task != null || (task = getTask()) != null) {
/** 获取work独占同步状态 **/
w.lock();
/** 如果当前线程池的状态为STOP,将work中工作线程标记为中断
* 1、如果线程池状态>=stop,且当前线程没有设置中断状态,wt.interrupt()
2、如果一开始判断线程池状态<stop,但Thread.interrupted()为true,即线程已经被中断,又清除了中断标示,再次判断线程池状态是否>=stop
是,再次设置中断标示,wt.interrupt()
* 否,不做操作,清除中断标示后进行后续步骤
*
* **/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
/** 模板方法给子类扩展 **/
beforeExecute(wt, task);
Throwable thrown = null;
try {
/** 处理任务 **/
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
/** 模板方法给子类扩展 **/
afterExecute(task, thrown);
}
} finally {
/** 重置任务 **/
task = null;
/** work执行的任务数量 **/
w.completedTasks++;
/** 释放work独占同步状态 **/
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
从WorkQueue获取任务
getTask()返回阻塞队列中等待的任务。当以下几种情况返回null 告知work需要销毁
1 线程池中状态 >= STOP 或者 线程池状态 == SHUTDOWN且阻塞队列为空
2 判断是否需要超时控制如果需要超时则等待超时时间内返回任务。超过时间返回null
/**
* 从WorkQueue获取任务
* 同时用来判断work何时退出销毁
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
/** 无限循环,
* 当work超过指定时间没有获取时,设置timedOut = true进行二次遍历时销毁当前work **/
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/** 线程池中状态 >= STOP 或者 线程池状态 == SHUTDOWN且阻塞队列为空,则停止worker - 1,return null **/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
/** 获取work数量 **/
int wc = workerCountOf(c);
/** 判断是否需要开启work淘汰机制 **/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/**
* 以下几种情况直接销毁当前work
*
* 超时没有获取任务timedOut=tue,for循环遍历第二次时
* 当前任务超过maximumPoolSize
* **/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/**
* 如果开启work淘汰机制超时获取任务,调用poll阻塞获取任务,存在超时,如果超时没有获取到任务
* 设置timedOut = true 进入第二次循环销毁
*
* 如果没开启work淘汰机制超时获取任务,调用take阻塞获取任务
* 【这里的阻塞都能被中断响应!!】
**/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
work退出回收
processWorkerExit
/**
* 执行work销毁退出操作
* work 要结束的worker
* completedAbruptly 表示是否需要对work数量-1操作
* runWorker 正常执行时 completedAbruptly 为false
* runWorker 执行出现异常 completedAbruptly 为true
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
/** 从workers 集合中移除worker **/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
/** 尝试将线程池状态设置为Terminate **/
tryTerminate();
int c = ctl.get();
/** **/
if (runStateLessThan(c, STOP)) {
/** 如果 work正常退出,需要判断当前线程数量 < 要维护的线程数量 如果是addWorker()添加一个非核心work **/
if (!completedAbruptly) {
/**
* 如果允许回收核心线程,且workQueue还存在需要处理任务 work线程需要大于1
* 如果不允许回收核心线程,则work线程需要大于corePoolSize
* **/
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
/** 如果 work 是异常退出 addWorker() 添加一个非核心work**/
addWorker(null, false);
}
}
网友评论