1. 向线程池提交任务
可以使用两个方法向线程池提交任务,分别是execute() 和 submit() 方法。
1.1 execute() 方法
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功,提交的方法源码如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果当前线程数小于corePoolSize,则启动新线程
if (workerCountOf(c) < corePoolSize) {
// 添加Worker,将command设置为worker线程的第一个任务开始执行
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果当前线程数大于或等于corePoolSize,则调用workQueue.offer放入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 放入队列中后发现没有线程执行任务,开启新线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 线程数大于maxPoolSize,并且队列已满,调用拒绝策略
else if (!addWorker(command, false))
reject(command);
}
// 该方法用于启动新线程。如果第二个参数设置为true,则使用corePoolSize作为上限,否则使用maxPoolSize作为上限
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池状态值起码是SHUTDOWN和STOP,或者第一个任务不是null,或者工作队列为空
// 那么则添加worker失败,返回false
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 工作线程数达到上限,要么是corePoolSize要么是maximumPoolSize,启动 线程失败
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 增加worker数量成功,返回到retry语句
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果线程池运行状态起码是SHUTDOWN,则重试retry标签语句,CAS
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// worker数量加1成功后,接着运行:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建worker对象
w = new Worker(firstTask);
// 获取线程对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 由于线程已经在运行中,无法启动,抛异常
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将线程对应的worker加入worker集合
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
// 如果添加worker成功,则启动该worker对应的线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果启动新线程失败
if (! workerStarted)
// workCount - 1
addWorkerFailed(w);
}
return workerStarted;
}
1.2 submit() 方法
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且通过futrue的get() 方法来获取返回值,get() 方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线 程一段时间后立即返回,这时候有可能任务没有执行完。
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池
executor.shutdown();
}
2. 任务的执行过程
在上面的任务提交过程中,可能会开启一个新的Worker,并把任务本身作为firstTask赋给该Worker。但是对于一个Worker来说,不是只执行一个任务,而是源源不断地从队列中取得任务执行,是不断循环的过程。
来看下Worker的run() 方法实现的过程。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// 当前Worker对象封装的线程
final Thread thread;
/** Initial task to run. Possibly null. */
// 线程需要运行的第一个任务。可以是null,如果是null,则线程从队列获取任务
Runnable firstTask;
/** Per-thread task counter */
// 记录线程执行完成的任务数量,每个线程一个计数器
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
/**
* 使用给定的第一个任务并利用线程工厂创建Worker实例
* @param firstTask 线程的第一个任务,如果没有,就设置为null,此时线程会从队列 获取任务。
*/
Worker(Runnable firstTask) {
// 线程处于阻塞状态,调用runWorker的时候中断
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
// 调用ThreadPoolExecutor的runWorker方法执行线程的运行
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 中断Worker封装的线程
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 如果线程初始任务不是null,或者从队列获取的任务不是null,表示该线程应该执行任务。
while (task != null || (task = getTask()) != null) {
// 获取线程锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池停止了,确保线程被中断
// 如果线程池正在运行,确保线程不被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 获取到任务后,再次检查线程池状态,如果发现线程池已经停止,则给自己发 中断信号
wt.interrupt();
try {
// 任务执行之前的钩子方法,实现为空
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任务执行结束后的钩子方法,实现为空
afterExecute(task, thrown);
}
} finally {
// 任务执行完成,将task设置为null
task = null;
// 线程已完成的任务数加1
w.completedTasks++;
// 释放线程锁
w.unlock();
}
}
// 判断线程是否是正常退出
completedAbruptly = false;
} finally {
// Worker退出
processWorkerExit(w, completedAbruptly);
}
}
3. 优雅的关闭线程池
线程池的关闭,较之线程的关闭更加复杂。当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。
3.1 线程池的生命周期
JDK7中,把线程数量(workerCount) 和线程池状态(runState)这两个变量打包存储在一个字段中。最高的3位存储线程池的状态,其余29位存储线程个数。JDK6中,这两个变量是分开存储的。
int类型的ctl变量四个字节32位的值
ThreadPoolExecutor
由上面的代码可以看到,ctl变量被拆成两半,最高的3位用来表示线程池的状态,低的29位表示线程的个数。线程池的状态有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED。
下面分析状态之间的迁移过程,如图所示:
线程池状态的迁移过程
线程池有两个关闭方法,shutdown()和shutdownNow(),这两个方法会让线程池切换到不同的状态。在队列为空,线程池也为空之后,进入TIDYING 状态;最后执行一个钩子方法terminated(),进入TERMINATED状态,线程池才真正关闭。
这里的状态迁移有一个非常关键的特征:从小到大迁移,-1,0,1,2,3,只会从小的状态值往大的状态值迁移,不会逆向迁移。例如,当线程池的状态在TIDYING=2时,接下来只可能迁移到TERMINATED=3,不可能迁移回STOP=1或者其他状态。
除 terminated()之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现自己的线程池,可以重写这几个方法:
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
3.2 正确关闭线程池的步骤
关闭线程池的过程为:在调用 shutdown()或者shutdownNow()之后,线程池并不会立即关闭,接下来需要调用 awaitTermination() 来等待线程池关闭。关闭线程池的正确步骤如下:
// executor.shutdownNow();
executor.shutdown();
try {
boolean flag = true;
do {
flag = ! executor.awaitTermination(500, TimeUnit.MILLISECONDS);
} while (flag);
} catch (InterruptedException e) {
// ...
}
awaitTermination(...)方法的内部实现很简单,如下所示。不断循环判断线程池是否到达了最终状态TERMINATED,如果是,就返回;如果不是,则通过termination条件变量阻塞一段时间,之后继续判断。
awaitTermination()
3.3 shutdown()与shutdownNow()的区别
- shutdown()不会清空任务队列,会等所有任务执行完成,shutdownNow()清空任务队列。
-
shutdown()只会中断空闲的线程,shutdownNow()会中断所有线程。
shutdown()
shutdownNow()
下面看一下在上面的代码里中断空闲线程和中断所有线程的区别。
shutdown()方法中的interruptIdleWorkers()方法的实现:
interruptIdleWorkers()
interruptIdleWorkers()
关键区别点在tryLock():一个线程在执行一个任务之前,会先加锁,这意味着通过是否持有锁,可以判断出线程是否处于空闲状态。tryLock()如果调用成功,说明线程处于空闲状态,向其发送中断信号;否则不发送。
tryLock()方法:
tryLock()
tryAcquire方法:
tryAcquire()
shutdownNow()调用了 interruptWorkers(); 方法:
interruptWorkers()
interruptIfStarted() 方法的实现:
interruptIfStarted()
在上面的代码中,shutdown() 和shutdownNow()都调用了tryTerminate()方法,如下所示:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 当workQueue为空,wordCount为0时,执行下述代码。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将状态切换到到TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 调用钩子函数
terminated();
} finally {
// 将状态由TIDYING改为 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 通知awaitTermination(...)
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
tryTerminate()不会强行终止线程池,只是做了一下检测:当workerCount为0,workerQueue为空时,先把状态切换到TIDYING,然后调用钩子方法terminated()。当钩子方法执行完成时,把状态从TIDYING 改为 TERMINATED,接着调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程。
所以,TIDYING和TREMINATED的区别是在二者之间执行了一个钩子方法terminated(),目前是一个空实现。
4. 合理地配置线程池
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
1.任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
2.任务的优先级:高、中和低。
3.任务的执行时间:长、中和短。
4.任务的依赖性:是否依赖其他系统资源,如数据库连接。
4.1 为IO密集型任务确定线程数
由于 IO 密集型任务的 CPU 使用率较低,导致线程空余时间很多,所以通常就需要开 CPU核心数两倍的线程。当 IO 线程空闲时,可以启用其他线程继续使用 CPU,以提高 CPU 的使用率。
IO密集型时,大部分线程都阻塞,故需要多配置线程数,
公式:
(1)2*cpu核数
(2)CPU核数/ 1-阻塞系数(0.8~0.9)
Netty 的 IO 处理任务,就是典型的 IO 密集型任务。所以,Netty 的 Reactor 反应器实现类(定制版的线程池)的 IO 处理线程数,默认正好为 CPU 核数的 2 倍,以下是其相关的代码:
//多线程版本 Reactor 反应器组
public abstract class MultithreadEventLoopGroup extends
MultithreadEventExecutorGroup implements EventLoopGroup {
//IO 事件处理线程数,反应器数量
private static final int DEFAULT_EVENT_LOOP_THREADS;
//IO 事件处理线程数默认值:CPU 核数的 2 倍
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1,
SystemPropertyUtil.getInt("io.netty.eventLoopThreads",
Runtime.getRuntime().availableProcessors() * 2));
}
/**
*构造器
*/
protected MultithreadEventLoopGroup(int nThreads,
ThreadFactory threadFactory, Object... args) {
super(nThreads == 0?
DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory,
args);
}
//...
}
4.2 为CPU密集型任务确定线程数
CPU 密集型任务也叫计算密集型任务,其特点是要进行大量计算而需要消耗 CPU 资源,比如计算圆周率、对视频进行高清解码等等。CPU 密集型任务虽然也可以并行完成,但是并行的任务越多,花在任务切换的时间就越多,CPU 执行任务的效率就越低,所以,要最高效地利用 CPU,CPU 密集型任务的并行执行的数量应当等于 CPU 的核心数。
比如说 4 个核心的 CPU,通过 4 个线程并行执行 4 个 CPU 密集型任务,此时的效率是最高的。但是如果线程数远远超出 CPU 核心数量,需要频繁的切换线程,线程上下文切换时需要消耗时间的,反而会使得任务效率下降。因此对于 CPU 密集型的任务来说,线程数等于 CPU 数就行。
4.3 为混合型任务确定线程数
混合型任务既要执行逻辑计算,又要进行大量非 CPU 耗时操作(如 RPC 调用、数据库访问、网络通信等),所以,混合型任务 CPU 利用率不是太高,非 CPU 耗时往往是 CPU 耗时的数倍。比如在 Web 应用处理 HTTP 请求处理时,一次请求处理会包括 DB 操作、RPC 操作、缓存操作等多种耗时操作。一般来说,一次 Web 请求的 CPU 计算耗时往往较少,大致在 100ms-500ms 之间,而其他耗时操作会占用 500ms-1000ms 甚至更多的时间。
估算公式:
最佳线程数目 =(线程等待时间与线程 CPU 时间之比 + 1)* CPU 核数
网友评论