本文主要内容
- 什么是线程池
- 线程池的使用
- 线程池的原理
- 线程池中的位运算
- 源码解析
什么是线程池
如果频繁创建线程,也是会影响整体资源或效率的,线程池的产生是为了避免重复的创建线程和回收线程。线程池有以下几个作用:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建、销毁线程造成的消耗
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果入限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。
线程池的使用
先来看看线程池的核心构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//异常处理略过
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
需要用户指定以下几个关键变量:
- corePoolSize:核心线程数量
- maximumPoolSize:最大线程数量
- allowCoreThreadTimeOut:是否允许线程超时(设置为true时与keepAliveTime,TimeUnit一起起作用)
- keepAliveTime:线程存活时间(当线程池允许线程超时且运行中的线程数量超过- corePoolSize时,会按照此变量设置时间关闭线程)
- TimeUnit:单位(一般与keepAliveTime同时使用,供线程池判断是否满足关闭线程的条件)
- workQueue:缓冲队列
- RejectedExecutionHandler:拒绝处理任务类(默认:AbortPolicy 会抛异常,见下方实例)
- threadFactory:线程工厂(默认:DefaultThreadFactory)
明白这些核心数据的作用,就可以随意使用线程池了
线程池的原理
如果让我们自己来设计一个线程池,我们应该怎么做呢?线程池最大的功能在于线程重复使用,不需要每执行一个任务就重新构造新线程。
如果用户提交的任务比较多,显然我们需要一个缓存队列用于存储任务。
为了达到线程重复使用的目的,线程应该不停地从缓存队列中获取新的任务,并且执行它。

线程池中的位运算
线程池中使用一个AtomicInteger对象来表征线程池的状态和大小,为了达到1个int型数据能保存2个数据的目的,源码采用了非常精妙的位运算来实现。
//线程池的各个状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
//两个常量
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//获取状态、线程池大小的方法以及一个或方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//关键变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
从源码可知,其实RUNNING等5个状态的值的前4位分别是:
- 1110 、0000、 0010 、0100、 0110
各状态后面接了28个0。
CAPACITY的值,前4位为0,后28位为1。
如果要获取线程池状态,则调用runStateOf此方法,ctl的值将与 ~CAPACITY 作与操作,得到的正是ctl值的前4位。
获取当前线程池大小,则是与CAPACITY 作与操作,正是取ctl的低位值。
所以 ctl 的高位值用于表征线程池状态,而低们值用于表征线程池的大小。
ctl的初始化,调用ctlOf方法,正是拿RUNNING与0作或操作,结果还是RUNNING,所以可知,在线程池初始化的时候,默认ctl的状态值为RUNNING,而线程池大小为0。
ctlOf方法的含义也可知,正是组合状态值与线程池大小。
源码解析
一般来说,线程池添加任务有两种方式,一种是execute方法,另一种是submit方法,submit方法其实也是会调用execute,所以在此只研究execute方法即可。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取线程池状态
int c = ctl.get();
//如果线程池大小少于核心线程,则直接添加新的Worker并返回
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果核心线程数量已满,则考虑将任务添加进入缓存队列,然后再次检查状态
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//最后核心线程数量已满,缓存队列已满,那么直接交给拒绝策略处理
else if (!addWorker(command, false))
reject(command);
}
再来看非常关键的addWorker方法:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
//获取线程池状态
int rs = runStateOf(c);
// 对状态进行检测,如果线程池已经被调用shutDown方法时,返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//获取线程池的大小
int wc = workerCountOf(c);
//如果线程池大小已经超过核心线程池大小,那么也直接返回,这种情况应该把任务缓存到队列中去
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//正常情况下,线程池大小加1即可
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//新建一个Worker,在执行Worker构造方法时,创建了新的线程,即下面的t,具体可参见Worker的构造方法。
Worker w = new Worker(firstTask);
Thread t = w.thread;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (t == null ||
(rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null))) {
decrementWorkerCount();
tryTerminate();
return false;
}
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
} finally {
mainLock.unlock();
}
//启动线程
t.start();
// It is possible (but unlikely) for a thread to have been
// added to workers, but not yet started, during transition to
// STOP, which could result in a rare missed interrupt,
// because Thread.interrupt is not guaranteed to have any effect
// on a non-yet-started Thread (see Thread#interrupt).
if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
t.interrupt();
return true;
}
在addWorker方法中,启动了一个新的线程,那我们来看看线程中的run方法,其实即是runWorker方法:
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
boolean completedAbruptly = true;
try {
//请注意,这是一个while循环,如果当前Worker的task不为空,则取当前Worker的task,执行任务
//如果当前task为空,则从缓存队列中取任务来执行,这就是线程池中最重要概念线程复用的体现
while (task != null || (task = getTask()) != null) {
w.lock();
clearInterruptsForTaskRun();
try {
beforeExecute(w.thread, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
我们来看看getTask方法是否真的是从缓存队列中取任务:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
//正常情况下,线程池数量应该少于最大值,并且也不会timedOut,
//如果真的大于了最大值,则应该删除一个线程
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
//从缓存队列中取出任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
线程池分析到这,基本结束了,内中最多的细节,比如同步锁的使用,比如状态的判断,比如最后停止线程池等等,不再细说了,线程池介绍到这基本原理都清楚了,有点类似于android中的Looper,死循环中取消息,分发消息并执行对应操作等,线程池也是这样。
线程池中的位运算,刚开始可能看不懂,但在纸上写下各个数,基本就能明白各个方法的含义了。
最后再补充一个小细节,负数在计算机中的表示,为了更方便实现2进制数的加减法,负数使用补码表示,具体则是正数部分取反加1,而正数的补码则是正数本身。正因为如此,RUNNING 前4位才是1110,具体的各位可以计算下,补码的意义可以自行搜索。
网友评论