1、由于系统创建和销毁线程都会占用系统资源(CPU时间),如果对于某些执行耗时很少,但是数量很多的任务,大部分的时间都会花在创建和销毁线程,所以引入了线程池的概念;其实原理和数据库连接池(对象池)是一样的,都是为了避免某些不必要的损耗。
2、我们可以使用构造方法去初始化线程池,
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
其中有几个必要的参数:
(1)corePoolSize: 核心线程池的大小(可以理解为空闲时期线程池中最小数目,但是需要任何时期线程数量大于等于过corePoolSize),必须大于等于0。
(2)maximumPoolSize: 线程池中最大线程数量,必须大于0。
(3)keepAliveTime: 空闲时期非核心线程最大存活时间,必须大于0。
(4)workQueue: 任务队列。
(5)threadFactory: 线程创建工厂。
(6)handler: 饱和策略。
上面几个参数很好理解,也是初始化线程池的必要参数;然后线程池还提供了一个比较有意思的参数:
allowCoreThreadTimeOut: 是否允许核心线程超时,它的默认值是false
从字面意思理解,就是是否让核心线程超时销毁,所以我们一般所理解的核心线程不会被销毁,在这个值设置为true的时候,就是不正确的哦(我就被面试官问到过,然后还自信满满的说核心线程不会被销毁!);具体使用后面解释。
3、现在我们需要使用线程池来执行我们的任务,它提供了
// 使用Future模式,有三种重载
Future future = executor.submit(() -> System.out.println("do work"));
// 普通提交任务方式
executor.execute(() -> System.out.println("do work"));
等几种方法提交任务。但是最终都是使用execute方法去执行任务,只不过submit是对我们的任务进行了封装;所以我们关注execute的逻辑,究竟线程池是怎样帮助我们完成任务的。
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
// 获取当前运行状态以及线程数量
int c = ctl.get();
// 如果线程数小于核心线程数,则创建新线程(无论线程池中是否有可以执行任务的线程)
if (workerCountOf(c) < corePoolSize) {
// 创建新线程并且执行任务
if (addWorker(command, true))
return;
// 核心线程创建失败,获取当前运行状态以及线程数量
c = ctl.get();
}
// 判断是否可以接受新任务(当前运行状态),以及队列是否满
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检测,如果当前运行状态是非RUNNING,而且任务移除成功,那么拒绝任务(会执行饱和策略)
if (! isRunning(recheck) && remove(command))
reject(command);
// 当前运行状态是RUNNING或者移除任务失败,再判断未终止的线程数是否等于0,是则创建新线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 当运行状态为非RUNNING或者队列满了(逻辑上如此,但是实际在addWorker中如果运行状态是非RUNNING并且传入任务非空,是无法创建新线程的),创建新线程;如果创建失败(由于运行状态、或者最大线程数),则拒绝任务
else if (!addWorker(command, false))
reject(command);
以上是execute的源码,大致上的逻辑我们都清楚了,有一点需要我们注意,就是核心线程的创建,它并不是有可执行任务的核心线程就不去创建,而是只要当前线程数小于核心线程数的时候,有新任务添加就会直接创建核心线程,然后我们需要明白它是如何判断运行状态的;线程池中提供了几个常量:
// 位移位数 32 - 3
private static final int COUNT_BITS = Integer.SIZE - 3;
// 计量新建线程数量的最大值 000111...111(29个1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// RUNNING状态 111000...000(29个0)
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN状态 000000...000(32个0)
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP状态 001000...000(29个0)
private static final int STOP = 1 << COUNT_BITS;
// TIDYING状态 010000...000(30个0)
private static final int TIDYING = 2 << COUNT_BITS;
// TERMINATED状态 011000...000(29个0)
private static final int TERMINATED = 3 << COUNT_BITS;
以上常量就是用来表示线程池运行状态的,然后记录当前运行状态以及线程数量用
// 将当前线程池设置为RUNNING状态,并计入0个线程
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
所以我们也就可以理解,它用了32位的整型做状态表示以及计数,前三位表示运行状态,后29位用来计数。所以对于源码里面的几个函数我们也就可以理解了
// 判断当前运行状态 c是ctl.get()获取的当前运行状态以及线程数量值,然后与上111000...000
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 计算当前线程数目 c是ctl.get()获取的,然后与上000111...111
private static int workerCountOf(int c) { return c & CAPACITY; }
// 运行状态(rs)下计入线程数量(wc)
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 判断运行状态大小
private static boolean runStateLessThan(int c, int s) { return c < s; }
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
// 判断是否RUNNING状态
private static boolean isRunning(int c) { return c < SHUTDOWN; }
现在关于整个execute的执行逻辑、判断条件基本理解,所以我们需要理解它是如何添加任务,如何让线程运行起来,执行完任务之后如何去等待继续去执行新任务。
以下是addWorker的源码,我们分为两部分分析,先看CAS判断:
retry:
for (;;) {
int c = ctl.get();
// 获取当前运行状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//这里就是我们之前说的,如果运行状态是非RUNNING并且当前任务是非空,是无法创建新线程的
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取当前未终止的线程数量
int wc = workerCountOf(c);
// 判断数量是否超过限制(计数最大限制、核心线程限制、最大线程数量限制)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 比较并增加1,如果成功,那么结束判断,进入创建线程逻辑
if (compareAndIncrementWorkerCount(c))
break retry;
// 重新判断运行状态,如果有变化,则重新进入retry循环,否则继续当前循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
以上是CAS算法判断是否能够新创建线程,如果成功break出retry循环,那么就进入创建线程的逻辑。
然后我们在分析线程创建:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建Worker(就是我们的线程),这里Worker中会带一个Thread对象与它(Worker)做双向引用,后续分析Worker的工作原理
// firstTask就是我们真正的需要执行的任务
w = new Worker(firstTask);
// 这就是Worker中的Thread对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将新创建的Worker加入HashSet中
workers.add(w);
// 记录至今最大的workers数量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果创建成功,则启动Worker中的线程,这里很重要,这也是Worker的启动,帮助我们执行任务的关键,需要结合Worker初始化的源码分析,才能更好理解
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果创建失败,那么做失败处理
if (! workerStarted)
addWorkerFailed(w);
}
// 返回是否成功的标志
return workerStarted;
上面就是创建Worker(线程)的逻辑,比较关键的是Worker的初始化和启动,现在我们继续分析Worker的源码,理解它是如何与Thread做绑定,然后帮助我们执行任务的:
Worker(Runnable firstTask) {
// 这里是标志Worker状态
setState(-1); // inhibit interrupts until runWorker
// 需要执行的任务
this.firstTask = firstTask;
// 创建Thread,并且Thread中的Runnable对象是Worker本身
this.thread = getThreadFactory().newThread(this);
}
Worker其实也是实现了Runnable接口,从构造函数我们可以知道,在初始化Worker的时候,将本身和它的Thread对象进行双向引用,再结合addWorker中启动Worker中Thread的逻辑,就明白了,t.start实际上是执行了Worker中的run方法,然后我们继续分析Worker中的run方法,它执行了runWorker方法:
final void runWorker(Worker w) {
// 获取当前线程,也就是Worker中的Thread
Thread wt = Thread.currentThread();
// 获取Worker需要执行的任务(也就是我们实际的任务)
Runnable task = w.firstTask;
w.firstTask = null;
// 改变Worker的状态
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 判断当前任务是否为null,如果是空,则去队列获取任务
while (task != null || (task = getTask()) != null) {
// 改变Worker的状态
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务之前,可以自己重写该方法,从而做响应的预处理
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行我们实际的任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务之后,可以自己重写该方法,从而在结束之后做相应处理
afterExecute(task, thrown);
}
} finally {
task = null;
// 当前Worker至今完成的所有任务总和
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 由于获取任务超时终止当前Worker,这里对Worker做终止处理
processWorkerExit(w, completedAbruptly);
}
}
以上是Worker的工作原理,其中最主要的是getTask方法,这里就是保证它不退出一直WAITING或者TIMED_WAITING,等待任务入队的关键(这里有个面试题哟,面试官问过我,当线程执行完任务之后会处于什么状态,很多人可能认为会处于阻塞状态,因为BlockingQueue嘛,但是是不对哦,BlockingQueue中take方法是用了LockSupport.park来使当前线程进入WAITING,而poll(timeout, timeunit)方法则是用LockSupport.parkNanos使线程进入TIMED_WAITING!这里可以了解ReentrantLock;不过我们根据线程状态改变的条件也能推断,这里状态改变的情况);然后我们再来看一下getTask的源码:
/**
* 我们需要明白一点,当getTask方法返回null的时候,就表示当前调用Worker需要终止
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 如果运行状态为SHUTDOWN并且队列为空或者运行状态是STOP,那么终止Worker
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 这里就是我们之前说的,那个比较有意思的属性,是否让核心线程超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果线程数量大于最大数量或者已经超时 并且 线程池中有线程或者队列为空,则尝试结束当前Worker
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 判断是否需要有超时获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
上面就是getTask的逻辑,然后主要就是在
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
如果timed是true,也就是当前线程数量大于核心数量或者是我们把allowCoreThreadTimeOut属性设置为true,那么就使用poll超时获取,否则使用take一直等待获取任务;所以其实对于线程池,核心线程也是有可能被销毁的!
到这里,我们基本将线程池的整个工作逻辑都串起来了,也基本明白它是如何帮助我们执行任务;但是这仅仅是主干逻辑,还有很多细节,比如它的shutdown处理、terminal处理以及Worker的状态改变等等。所以看似简单,但是要吃透,还需要更深入的理解。
如果有不正确的地方,请帮忙指正,谢谢!
网友评论