主要内容
- ThreadPoolExecutor继承关系
- 实现原理
- 线程池状态
- 线程池的创建
- 任务提交
前言
说到创建线程大家首先会想到调用线程的start()
来创建,但如果在并发环境下,且线程执行时间都较短的话,这样频繁地创建和销毁线程会降低系统的效率。
这时候就需要工具来统一对线程进行分配、监控以及管理。java1.5后引入的Executor
线程池可以达到这样的效果,将任务的定义和执行解耦,定义好线程提交给线程池后,就不用管它怎么执行。接下来主要讲解ThreadPoolExecutor
。
ThreadPoolExecutor
调用初始化线程池方法时发现除了newScheduledThreadPool
特殊以外,其他方法内部都是基于ThreadPoolExecutor
实现的,例如创建固定数量的线程池newFixedThreadPool
、创建单个线程newSingleThreadExecutor
等,代码举例如下。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
因此先来了解下核心类ThreadPoolExecutor
。
继承关系
- Executor:接口,只有一个声明方法,用来向线程池提交任务,由线程池来执行线程。这个方法ThreadPoolExecutor进行具体实现,是核心方法。
void execute(Runnable command);
- AbstractExecutorService:抽象类,实现了ExecutorService接口中的
submit
、invokeAny
和invokeAll
。主要介绍下核心的submit方法,主要用于向线程池提交任务,发现实际上是调用了execute
方法,采用FutureTask
来获取任务执行结果。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);//新建FutureTask
execute(ftask);
return ftask;
}
线程池原理
线程池状态
/**
ctl变量设计得很巧妙,用这个AtomicInteger来表示workerCount和runState,
其中runState占高3位,后29位为workerCount的值。
workerCount:当前活动的线程数;
runState:线程池的当前状态。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//COUNT_BITS表示workerCount占的位数,29位
private static final int COUNT_BITS = Integer.SIZE - 3;
//CAPACITY表示的是workderCount能使用的最大个数,值是0001...1(29个1)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
下面五个值表示的是线程池的状态,存储在高3位。
可以直接用<=>大于来比较状态,running<shutdown<stop<tidying<terminated
*/
//高三位是111。初始时线程处于这个状态,能接受新任务,且处理阻塞队列中的任务。
private static final int RUNNING = -1 << COUNT_BITS;
//高三位是000。不接受新的任务,但会处理完阻塞队列中的任务。
private static final int SHUTDOWN = 0 << COUNT_BITS;
//高三位是001。不接受新的任务,也不处理阻塞队列中的任务,会去尝试中断正在执行的任务。
private static final int STOP = 1 << COUNT_BITS;
//高三位是010。所有任务都被终止了,且当前线程数为0,转到这个状态的线程即将执行terminated()的钩子方法。
private static final int TIDYING = 2 << COUNT_BITS;
//高三位是011。terminated()执行结束。
private static final int TERMINATED = 3 << COUNT_BITS;
/**
这个方法用于取出runState线程池当前状态的值,传入的参数为ctl。
~为按位取反操作,则~CAPACITY值1110...0(29个0),
然后做&操作,低29位置为0,得到了高3位就是runState的值
*/
private static int runStateOf(int c) { return c & ~CAPACITY; }
/**
这个方法用于取出workerCount线程池当前线程个数,传入的参数为ctl。
CAPACITY值0001...1(29个1),
然后做&操作,高3位置为0,得到了低29位就是workerCount的值
*/
private static int workerCountOf(int c) { return c & CAPACITY; }
/**
这个方法用于将runState和workerCount组装成一个值,
传入的参数rs其实就是runState、wc是workerCount。
*/
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池中状态转换
线程池的创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
解释下各个参数。
- corePoolSize:线程池的基本大小。提交任务时,线程池才会创建新线程来执行任务,直到当前线程数等于corePoolSize;如果相等了,继续提交的任务将会放在阻塞队列中等待执行。
- maximumPoolSize:线程池中允许的线程最大数目
-
keepAliveTime:默认情况下,当线程数大于
corePoolSize
时这个参数才起作用,空闲线程等待的时间达到keepAliveTime
后会进入终止状态,直到线程数等于corePoolSize
-
unit:
keepAliveTime
的时间单位,取值在java.util.concurrent.TimeUnit
类中 -
workQueue:阻塞队列,用来保存等待执行的线程,任务必须实现
Runnable
接口,JDK中主要提供了四个阻塞队列:- ** ArrayBlockingQueue**:基于数组实现的FIFO阻塞队列
- ** LinkedBlockingQuene**:基于链表实现的FIFO阻塞队列,静态工厂方法
Executors.newFixedThreadPool()
使用了这个队列。 - ** SynchronousQuene**:不存储元素的阻塞队列,插入操作需要等到另一个线程调用移除操作才可以执行,不然的话就会一直阻塞。静态工厂方法
Executors.newCachedThreadPool()
使用了这个队列。 - ** priorityBlockingQuene**:具有优先级的无限制阻塞队列
- threadFactory:创建新线程的线程工厂类
-
handler:饱和策略,即当线程池处于饱和状态时,队列和线程池都满了,对提交的新任务采取的措施。观察源码发现ThreadPoolExecutor有四个内部类实现了
RejectedExecutionHandler
接口。- AbortPolicy:直接抛出异常,默认饱和策略
- CallerRunsPolicy:只用当前线程来执行任务
- DiscardPolicy:不处理,直接丢弃掉
- DiscardOldestPolicy:将队列最前面的元素丢弃掉,并执行当前任务
任务提交
在ThreadPoolExecutor
类中核心的任务提交方法是executor
方法。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果当前线程数小于corePoolSize的话,增加线程来执行新传入的任务。
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))//添加新线程,true表示要检测当前线程数量小于corePoolSize
return;//创建成功,终止方法的执行
c = ctl.get();//新线程创建失败,记录当前runState和workerCount
}
//当前线程如果是运行状态,并且成功插入到缓冲队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//线程池不是运行状态,且成功从缓冲队列中删除任务
reject(command);//使用线程池指定的饱和策略处理任务
else if (workerCountOf(recheck) == 0)//当前活动线程为0
addWorker(null, false);//添加新线程,新建线程对应的任务为null
}
/**
当前线程池不是运行状态;
当前线程池是运行状态,但缓冲队列已经满了不能再添加新任务。
满足任意一个条件尝试着新建线程。
*/
else if (!addWorker(command, false))//新建线程失败,这里的false表示要检测当前线程数量小于maximumPoolSize
reject(command);//使用线程池指定的饱和策略处理任务
}
- 可以发现如果当前线程数小于corePoolSize时,不管当前线程池中的线程是否有空闲的,每调用一次execute方法都会添加新线程,直到当前线程数达到corePoolSize为止;
- 线程数大于等于corePoolSize,小于maximumPoolSize时,优先加入到缓冲队列中
- 如果任务被成功放入队列,还要进行校验是否需要开启新线程,只有当前工作线程数为0才会创建新的线程,之前线程可能因为处于运行状态或者因为线程状态结束而从缓冲队列中移除
- 如果放入队列失败,才会去创建新的工作线程
提交任务过程中有个重要的方法addWorker
,接下来详细介绍这个方法。
/**
* 根据当前线程池状态以及参数来校验是否能新增线程,新增线程
* @param firstTask 这个新增线程的第一个任务
* @param core true表示新增时判断当前活动线程是否小于corePoolSize;false与maximumPoolSize判断
* @return true表示成功新增一个线程,false创建线程失败
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();//获取当前ctl变量值
int rs = runStateOf(c);//当前线程池状态
/**
等价于rs>=SHUTDOWN && (rs!=SHUTDOWN || firstTask!=null || workQueue.isEmpty())
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);//当前线程个数
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))//使用CAS将当前线程个数加一
break retry;//成功的话跳出循环,进入创建新线程的过程
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)//如果线程池状态改变,则进入外循环重新迭代
continue retry;
//因为当前线程个数改变导致的失败,则迭代将当前线程个数加一的操作
}
}
//重头戏来了!!
//接下来是创建新线程的过程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
int c = ctl.get();//再次进行状态的校验
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);//新创建的线程加入到workers(HashSet)中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);//创建线程失败
}
return workerStarted;
}
未完。。。
网友评论