关于线程池的类图如下
image.png
因为线程池好多方法都是ThreadPoolExecutor来实现的,这里我们先看下ThreadPoolExecutor
在文章开头我们先陈诉下概念
在这篇文章中我们将我们需要提交给线程池执行的 统称为任务如:
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
for (int i=0;i<20;i++){
//new ThreadDemo("task-"+i)就是一个任务
threadPoolExecutor.execute(new ThreadDemo("task-"+i));
}
一、参数
/**记录线程池状态以及数量 前三位表示线程池状态
**初始状态为 1110 0000 0000 0000 0000 0000 0000 0000*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程数量统计位数29 Integer.SIZE为32
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池容量最大值 二进制 0001 1111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**线程池运行状态 JAVA -1二进制为32个1 (二进制首位为1代表)
**所以二进制为 1110 0000 0000 0000 0000 0000 0000 0000 **/
private static final int RUNNING = -1 << COUNT_BITS;
//线程池关闭状态 二进制为 0000 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
//线程池停止状态 二进制为 0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
//线程池整理状态 二进制为 0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
// 线程池终止状态二进制为 0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
//这里将一部分方法放到这里来看便于理解线程池的状态
/**返回线程池状态 ~CAPACITY取反操作 正数的补码为正数的原码
**CAPACITY的补码 ~CAPACITY 为 1110 0000 0000 0000 0000 0000 0000 0000 **/
private static int runStateOf(int c) { return c & ~CAPACITY; }
//返回线程个数 初始状态 ctl &CAPACITY 为0
private static int workerCountOf(int c) { return c & CAPACITY; }
//
private static int ctlOf(int rs, int wc) { return rs | wc; }
//存放阻塞任务的队列
private final BlockingQueue<Runnable> workQueue;
//锁
private final ReentrantLock mainLock = new ReentrantLock();
//线程池中正在运行状态的线程(工作线程)
private final HashSet<Worker> workers = new HashSet<Worker>();
//锁条件
private final Condition termination = mainLock.newCondition();
//
private int largestPoolSize;
//统计已经完成的任务
private long completedTaskCount;
//线程工厂
private volatile ThreadFactory threadFactory;
//线程池拒绝策略
private volatile RejectedExecutionHandler handler;
//空闲线程存活时间
private volatile long keepAliveTime;
//默认为false,为false时,核心线程空闲时,仍然运行,为true 核心线程数空闲keepAliveTime时间将停止
private volatile boolean allowCoreThreadTimeOut;
//
private volatile int corePoolSize;
//线程池的最大容量,受CAPACITY影响
private volatile int maximumPoolSize;
//
private final AccessControlContext acc;
二、线程池状态及转换
在上面我们可以看到线程池有五种状态 运行、关闭、停止、整理、终止五种状态。在这里我们先简单画个状态转换图
image.png
线程池区别线程,对于线程池状态,只能单向转换,并不能像线程一样有些状态可以双向转换,我们平时操作的也是shutdown()、shutdownNow()方法,对于线程池状态由SHUTDOWN(关闭)或STOP(停止)转向TIDYING(整理)状态是由shutdown()或shutdownNow()内部进行自行整理。
shutdown()和shutdownNow()区别在于,shutdown()会等待阻塞任务队列执行完毕(能正常添加阻塞队列中任务到工作线程只可能是RUNNING 状态),shutdownNow会立即向正在运行的工作线程执行中断请求,并且不会将阻塞任务添加到工作线程。
在这里我们先看2个例子,有助于我们理解后续线程池方法(我们可以在后续很条件判断看到线程池状态判断 区分运行或关闭状态为一种情况,其他状态为另一种种情况)。
例1:
public class FixedThreadPoolDemo {
static class ThreadDemo implements Runnable{
private String threadName;
public ThreadDemo(String name){
threadName= name;
}
@Override
public void run() {
Date date = new Date();
SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd :hh:mm:ss");
System.out.println("当前线程运行的是:"+ threadName+ ",开始时间为:"+dateFormat.format(date));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String []args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
for (int i=0;i<20;i++){
threadPoolExecutor.execute(new ThreadDemo("task-"+i));
}
Thread.sleep(5000);
System.out.println("开始执行shutdown");
threadPoolExecutor.shutdown();
}
}
因为我们执行的是shutdown(),所以这20个任务都会执行完毕。
当前线程运行的是:task-2,开始时间为:2019-05-18 :10:38:46
当前线程运行的是:task-0,开始时间为:2019-05-18 :10:38:46
当前线程运行的是:task-1,开始时间为:2019-05-18 :10:38:46
当前线程运行的是:task-4,开始时间为:2019-05-18 :10:38:46
当前线程运行的是:task-3,开始时间为:2019-05-18 :10:38:46
开始执行shutdown
当前线程运行的是:task-5,开始时间为:2019-05-18 :10:38:51
当前线程运行的是:task-8,开始时间为:2019-05-18 :10:38:51
当前线程运行的是:task-6,开始时间为:2019-05-18 :10:38:51
当前线程运行的是:task-7,开始时间为:2019-05-18 :10:38:51
当前线程运行的是:task-9,开始时间为:2019-05-18 :10:38:51
当前线程运行的是:task-10,开始时间为:2019-05-18 :10:38:56
当前线程运行的是:task-11,开始时间为:2019-05-18 :10:38:56
当前线程运行的是:task-13,开始时间为:2019-05-18 :10:38:56
当前线程运行的是:task-14,开始时间为:2019-05-18 :10:38:56
当前线程运行的是:task-12,开始时间为:2019-05-18 :10:38:56
当前线程运行的是:task-15,开始时间为:2019-05-18 :10:39:01
当前线程运行的是:task-18,开始时间为:2019-05-18 :10:39:01
当前线程运行的是:task-19,开始时间为:2019-05-18 :10:39:01
当前线程运行的是:task-17,开始时间为:2019-05-18 :10:39:01
当前线程运行的是:task-16,开始时间为:2019-05-18 :10:39:01
例2:
public class FixedThreadPoolDemo2 {
static class ThreadDemo implements Runnable{
private String threadName;
public ThreadDemo(String name){
threadName= name;
}
@Override
public void run() {
Date date = new Date();
SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd :hh:mm:ss");
System.out.println("当前线程运行的是:"+ threadName+ ",开始时间为:"+dateFormat.format(date));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Date date1 = new Date();
System.out.println("-----当前线程运行的是:"+ threadName + ",结束时间为:"+dateFormat.format(date1));
}
}
public static void main(String []args) throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
for (int i=0;i<20;i++){
threadPoolExecutor.execute(new ThreadDemo("task-"+i));
}
Thread.sleep(6000);
System.out.println("开始执行shutdownNow");
threadPoolExecutor.shutdownNow();
}
}
因为我们执行shutdownNow()方法,所以我们只会执行前10个任务,在前面
线程状态与方法我们说过interrupt()方法的用处,这里也可以看到当线程执行interrupt()将中断标记置为true,线程被sleep阻塞,则会抛出InterruptedException()异常
当前线程运行的是:task-4,开始时间为:2019-05-18 :10:43:43
当前线程运行的是:task-0,开始时间为:2019-05-18 :10:43:43
当前线程运行的是:task-3,开始时间为:2019-05-18 :10:43:43
当前线程运行的是:task-2,开始时间为:2019-05-18 :10:43:43
当前线程运行的是:task-1,开始时间为:2019-05-18 :10:43:43
-----当前线程运行的是:task-0,结束时间为:2019-05-18 :10:43:48
-----当前线程运行的是:task-3,结束时间为:2019-05-18 :10:43:48
-----当前线程运行的是:task-2,结束时间为:2019-05-18 :10:43:48
-----当前线程运行的是:task-4,结束时间为:2019-05-18 :10:43:48
-----当前线程运行的是:task-1,结束时间为:2019-05-18 :10:43:48
当前线程运行的是:task-5,开始时间为:2019-05-18 :10:43:48
当前线程运行的是:task-6,开始时间为:2019-05-18 :10:43:48
当前线程运行的是:task-8,开始时间为:2019-05-18 :10:43:48
当前线程运行的是:task-7,开始时间为:2019-05-18 :10:43:48
当前线程运行的是:task-9,开始时间为:2019-05-18 :10:43:48
开始执行shutdownNow
java.lang.InterruptedException: sleep interrupted
-----当前线程运行的是:task-5,结束时间为:2019-05-18 :10:43:49
-----当前线程运行的是:task-8,结束时间为:2019-05-18 :10:43:49
-----当前线程运行的是:task-7,结束时间为:2019-05-18 :10:43:49
-----当前线程运行的是:task-9,结束时间为:2019-05-18 :10:43:49
-----当前线程运行的是:task-6,结束时间为:2019-05-18 :10:43:49
at java.lang.Thread.sleep(Native Method)
at com.example.threaddemo.threadpool.FixedThreadPoolDemo2$ThreadDemo.run(FixedThreadPoolDemo2.java:31)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.example.threaddemo.threadpool.FixedThreadPoolDemo2$ThreadDemo.run(FixedThreadPoolDemo2.java:31)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
这里再注意一点,只有线程池处于RUNNING (运行)状态时,才可以往阻塞任务队列添加任务,在这里我们要区分阻塞任务队列和工作线程。
这里看个例子
public class FixedThreadPoolDemo3 {
static ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
static class ThreadDemo implements Runnable{
private String threadName;
public ThreadDemo(String name){
threadName = name;
}
@Override
public void run() {
Date date = new Date();
SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd :hh:mm:ss");
System.out.println("当前线程运行的是:"+ threadName + ",开始时间为:"+dateFormat.format(date));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class AddThread implements Runnable{
@Override
public void run() {
//往线程池添加任务进入
for (int i=0;i<20;i++){
threadPoolExecutor.execute(new ThreadDemo("task-"+i));
System.out.println("-------------添加task-"+i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String []args) throws InterruptedException {
Thread thread = new Thread(new AddThread());
thread.start();
Thread.sleep(8000);
System.out.println("开始执行shutdown");
threadPoolExecutor.shutdown();
}
}
因为只有线程池处于RUNNING (运行)状态时,才可以将多余任务放入阻塞任务队列,处于关闭状态时,线程池只能从阻塞任务队列去轮询,而不能去添加新的阻塞任务。
结果如下
-------------添加task-0
当前线程运行的是:task-0,开始时间为:2019-05-18 :01:59:38
-------------添加task-1
当前线程运行的是:task-1,开始时间为:2019-05-18 :01:59:39
-------------添加task-2
当前线程运行的是:task-2,开始时间为:2019-05-18 :01:59:40
-------------添加task-3
当前线程运行的是:task-3,开始时间为:2019-05-18 :01:59:41
-------------添加task-4
当前线程运行的是:task-4,开始时间为:2019-05-18 :01:59:42
-------------添加task-5
当前线程运行的是:task-5,开始时间为:2019-05-18 :01:59:43
-------------添加task-6
当前线程运行的是:task-6,开始时间为:2019-05-18 :01:59:44
-------------添加task-7
当前线程运行的是:task-7,开始时间为:2019-05-18 :01:59:45
开始执行shutdown
Exception in thread "Thread-0" java.util.concurrent.RejectedExecutionException: Task com.example.threaddemo.threadpool.FixedThreadPoolDemo3$ThreadDemo@426b98be rejected from java.util.concurrent.ThreadPoolExecutor@763f3e2e[Shutting down, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 4]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.example.threaddemo.threadpool.FixedThreadPoolDemo3$AddThread.run(FixedThreadPoolDemo3.java:44)
at java.lang.Thread.run(Thread.java:748)
三、内部类
1、Worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**序列化ID**/
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
/**默认采用Executors中内部类DefaultThreadFactory 默认情况下会创建一个相同的线程组,这里这个this 把自身传入进去,这是实现线程复用的关键**/
this.thread = getThreadFactory().newThread(this);
}
//这部分重写run方法,因为上面构造thread传入this,所以当thread进行start()方法时,会调用外层重写的这个run()
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
//因为我们刚开始将state状态置为-1,所以shutdown加锁会不成功
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//这里后续shutdownNow方法会调用到
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
四、方法
1、ThreadPoolExecutor
线程池的构造方法都是通过这个构造方法实现的
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2、execute
提交任务,如果可以线程池可以开启新的任务,则执行,否则这个任务会放到阻塞任务队列,或者拒绝任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取线程池状态以及数量 初始状态为线程池运行状态
int c = ctl.get();
/**在这里进行三步判断
* 1.如果线程池中线程数量小于线程池核心线程数,则去尝试新增线程
* 这里我们考虑个问题Q1 :为什么这里没有对线程池状态进行判断**/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/**2.如果线程池处于运行状态并且可以将任务添加进入阻塞任务队列
*offer为各个队列实现的方法,后续文章讲解**/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/**如果线程不是运行状态并且可以从阻塞队列移除任务(加锁操作)
*这里我们考虑个问题Q2:如果线程池在上面一层判断是处于运行状态,且
*已经成功加入到阻塞队列,中间调用shutdown方法,线程池状态变为关闭
*状态,那么此时将从阻塞队列移除掉这个任务吗? **/
if (! isRunning(recheck) && remove(command))
reject(command);
//当前线程池中工作线程数为0
else if (workerCountOf(recheck) == 0)
//这里因为上面已经将任务添加到阻塞任务队列,所以传null进入,等待线程轮询任务
addWorker(null, false);
}
/**3.添加新增线程失败(对比maximumPoolSize ) 则执行拒绝策略**/
else if (!addWorker(command, false))
reject(command);
}
关于问题Q2.我们可以做个例子验证下,在这个例子中我们要把ThreadPoolExecutor源码修改如下(采取将源码复制出来修改部分代码,后续会放上代码链接),具体如下
/**对于ThreadPoolExecutor类中execute方法修改如下,不想直接改源码,所以采用
*的将源码复制出来**/
if (isRunning(c) && workQueue.offer(command)) {
System.out.println("第一次判断线程池状态是否运行状态:"+isRunning(c));
//休眠一段时间,保证后续状态改变
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int recheck = ctl.get();
System.out.println("第二次判断线程池状态是否运行状态:"+isRunning(recheck));
if (! isRunning(recheck) && remove(command)) {
System.out.println("####任务被拒绝");
reject(command);
} else if (workerCountOf(recheck) == 0) {
addWorker(null, false);
}
}
public class FixedThreadPoolDemo4 {
static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
static class ThreadDemo implements Runnable{
private String threadName;
public ThreadDemo(String name){
threadName = name;
}
@Override
public void run() {
Date date = new Date();
SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd :hh:mm:ss");
System.out.println("当前线程运行的是:"+ threadName + ",开始时间为:"+dateFormat.format(date));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Date date1 = new Date();
System.out.println("-----当前线程运行的是:"+ threadName + ",结束时间为:"+dateFormat.format(date1));
}
}
static class AddThread implements Runnable{
@Override
public void run() {
//往线程池添加任务进入
for (int i=0;i<20;i++){
System.out.println("-------------添加task-"+i+"开始");
threadPoolExecutor.execute(new ThreadDemo("task-"+i));
System.out.println("-------------添加task-"+i+"结束");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class UpdateThread implements Runnable{
@Override
public void run() {
Date date = new Date();
SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd :hh:mm:ss");
System.out.println("执行线程池关闭开始时间:"+dateFormat.format(date));
threadPoolExecutor.shutdown();
Date date1 = new Date();
System.out.println("执行线程池关闭关闭时间:"+dateFormat.format(date));
}
}
public static void main(String []args) throws InterruptedException {
Thread thread = new Thread(new AddThread());
thread.start();
Thread.sleep(6000);
Thread thread1 = new Thread(new UpdateThread());
thread1.start();
}
}
因为我们在第二次读取线程池状态前进行了shutdown操作,所以导致线程池直接拒绝任务,结果如下(对于其中的空指针异常可以忽略,因为在复制出来的ThreadPoolExecutor类代码部分要reject()引用jre原来的类,为了编译通过将传递对象置为空)。
-------------添加task-0开始
-------------添加task-0结束
当前线程运行的是:task-0,开始时间为:2019-05-18 :02:25:10
-------------添加task-1开始
-------------添加task-1结束
当前线程运行的是:task-1,开始时间为:2019-05-18 :02:25:11
-------------添加task-2开始
-------------添加task-2结束
当前线程运行的是:task-2,开始时间为:2019-05-18 :02:25:12
-------------添加task-3开始
-------------添加task-3结束
当前线程运行的是:task-3,开始时间为:2019-05-18 :02:25:13
-------------添加task-4开始
-------------添加task-4结束
当前线程运行的是:task-4,开始时间为:2019-05-18 :02:25:14
-------------添加task-5开始
第一次判断线程池状态是否运行状态:true
执行线程池关闭开始时间:2019-05-18 :02:25:16
执行线程池关闭关闭时间:2019-05-18 :02:25:16
第二次判断线程池状态是否运行状态:false
####任务被拒绝
Exception in thread "Thread-0" java.lang.NullPointerException
-----当前线程运行的是:task-0,结束时间为:2019-05-18 :02:25:20
at com.example.threaddemo.util.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1859)
at com.example.threaddemo.util.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:563)
at com.example.threaddemo.util.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1138)
at com.example.threaddemo.threadpool.FixedThreadPoolDemo4$AddThread.run(FixedThreadPoolDemo4.java:50)
at java.lang.Thread.run(Thread.java:748)
-----当前线程运行的是:task-1,结束时间为:2019-05-18 :02:25:21
-----当前线程运行的是:task-2,结束时间为:2019-05-18 :02:25:22
-----当前线程运行的是:task-3,结束时间为:2019-05-18 :02:25:23
-----当前线程运行的是:task-4,结束时间为:2019-05-18 :02:25:24
关于execute方法流程图如下
image.png
3、addWorker
addWorker向线程池新增线程,返回true代表线程新增并且启动成功。
以下几种情况会添加失败
- 1.线程池处于停止、整理、终止状态
- 2.线程池状态为关闭状态 , firstTask 不为空或者workQueue为空
- 3.线程池线程数大于核心线程数或者最大线程数 (core决定)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/*** condition1
** 1.rs>=SHUTDOWN 线程池处于者停止、整理、终止、关闭状态
** 2.! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())
** 这个表达式会为true是以下几种情况:
** --线程池处于停止、整理、终止状态
** -- firstTask 不为空,线程池状态为 关闭状态
** -- workQueue 为空,线程池状态为 关闭状态
** 整体表达式成立的话是以下三种情况中一种或者多种
** --线程池处于停止、整理、终止状态
** --firstTask 不为空 ,线程池状态为关闭状态
** --workQueue 为空,线程池状态为关闭状态
** 整体表达式不成立是以下几种情况之一
** --线程池处于运行状态
** --线程池处于关闭状态且firstTask =null且workQueue不为空
** 我们之前在二、线程池状态及转换中简单介绍了线程池状态,对于可以
** 添加到工作线程,只可能是线程是处于运行状态或者线程池关闭状态
**对于关机状态而言,我们只能添加阻塞队列中的任务到工作线程,这也
** 印证了我们之前的问题 Q1
***/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//判断是否超出工作线程最大值
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS操作成功,则跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
//如果线程池状态发生改变,则去检查是否符合condition1
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
//线程池处于运行中状态或者线程池处于关闭状态并且firstTask 为null
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//因为前面判断了核心线程数和最大线程数,所以只有有限个核心线程数或者最大线程数线程可以启动
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//处理失败线程
addWorkerFailed(w);
}
return workerStarted;
}
关于此方法流程图如下
image.png4、runWorker
前面在内部类说了下,创建Worker时构建thread会将自身传入进去,所以当thread调用start方法,会调用Worker中的run方法,run方法调用runWorker()方法。也就是说我们创建了多少个线程(取决于corePoolSize、maximumPoolSize),就会执行多少次runWorker()方法。
我们提交给线程池的任务就是firstTask
这里我们强调一个概念,虽然我们提交给线程池的任务一般都是实现Runnable接口或者继承Thread类,但是我们没有start()这些实例,所以这些任务还是单纯的实例,并不是线程,我们执行任务的run()方法,和执行普通方法没有什么差别。这样我们才能在方法中轮询任务(如果不是这样的话,我们每次在方法中新启线程,对于新的线程我们是没法做到栈同步)。
换句话说,如果我们任务实现的Runnable接口包含一个test方法,如果下面源码中task.run(),换成task.test(),我们只需要在我们任务重写test方法中实现我们的逻辑,提交到线程池也是会正常执行的(这个例子有个前提是线程start方法也会调用test方法)。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
/**Thread.interrupted() 保证清除线程中断状态
**这个表达式保证线程池处于运行或关闭状态 中断状态标记清除
**线程池处于停止、整理、终止状态 线程中断
**/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//这个和执行普通方法没有什么区别,并不是线程start方法后需要调用 run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
//判断线程有没有发生异常
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
5、getTask
获取阻塞队列中的任务
这个方法中包含对keepAliveTime判断,如果当前阻塞队列没有任务,我们设置了keepAliveTime,则阻塞队列会在有限时间内做出返回,如果我们不设置,则会一直等待直到有新的任务加入(在此期间会一直执行在 workQueue.take(),导致线程中代码不往下执行,所以线程不会被回收),
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/** 有2种情况会不通过,也就是我们上面说的 线程状态分为2种 大多数情况下
**1.线程处于运行状态
**2.线程处于关闭状态而且阻塞队列不为空*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断当前工作线程数大于核心线程数或allowCoreThreadTimeOut
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 当前工作线程数大于最大线程数
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
/**阻塞队列各自实现类实现 ,后续讲解(这些方法会在阻塞队列中移除掉任务)
*poll 如果当前队列为空,则等待keepAliveTime时间返回
*take队列为空,则一直等待
*这个是实现线程池工作线程空闲时间基础
* 如果我们没有设置keepAliveTime,则当任务执行完毕,线程池中线程*
*会一直等待新的任务**/
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
6、processWorkerExit
当线程中没有任务时执行整理线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//线程发生异常,线程数减一
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//统计任务完成数
completedTaskCount += w.completedTasks;
//
workers.remove(w);
} finally {
mainLock.unlock();
}
//整理线程池
tryTerminate();
int c = ctl.get();
//线程池处于运行或者关闭状态去检查存活线程数量,维持一个空的线程
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
7、tryTerminate
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/**排除 1、线程池处于运行状态 2、线程池处于整理状态
* 3、线程池处于终止状态 4、线程池处于关闭状态但是阻塞队列不为空**/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) {
//尝试去中断工作线程
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
//提供一些方法以供后续子类使用
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
8、shutdown
这里我们看下前面说shutdown方法和shutdownNow方法区别
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查权限
checkShutdownAccess();
//线程池状态置为 关闭状态
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
//子类实现
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
//尝试去中断线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
/**注意这里调用内部类tryLock方法 ,因为我们刚开始将state状态置为-1,所
*以shutdown加锁会不成功,直到队列中任务执行完毕,线程被删除**/
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
9、shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//线程池状态置为停止
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
//直接中断线程
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
网友评论