1.概述
通常我们在需要使用线程时就去new一个线程,这样实现起来很方便,但是如果并发的线程数量很多,并且每个线程仅仅执行一个时间很短的任务就结束了,这样频繁地创建和销毁线程大大降低了系统的效率。对此,java提供了线程池来使得线程可以复用,在执行完一个任务后并不会立刻销毁,而是可以继续执行其他的任务,有效地节约了系统资源。接下来我们就详细讲解一下java线程池的原理,本文讲解的是jdk1.7的线程池,主要1.7版本的容易理解不少。和1.8版本的有些差异,但原理基本一样,主要是学习思想。
2. 代码讲解
2.1 代码框架介绍
我们先来看下线程池的uml图:
image.png
java.uitl.concurrent.ThreadPoolExecutor是线程池中最核心的一个类,就让我们从这个类入手
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
/* 核心池的大小,创建一个线程池后,默认情况下线程池中线程个数为0,等待有任务到来才创建线程去执行任务,
当线程池中数目达到corePoolsize,就把任务放到缓存队列中, 除非调用了preStartAllCoreThreads(),
预创建线程,在没有任务到来之前就创建corePoolSize个线程
*/
this.corePoolSize = corePoolSize;
// 表示线程池最多能创建多少个线程,
this.maximumPoolSize = maximumPoolSize;
/* 阻塞队列,用来存储等待执行的任务,线程池的排队策略与该参数紧密相关
常用的队列有LinkedBlockingQueue和Synchronous
* */
this.workQueue = workQueue;
/* 表示线程没有任务执行时最多能保持多久时间会终止,默认情况下,只有当线程池中的数目大于corePoolSize,
如果一个线程空闲的时间达到keepAliveTime,该线程会被终止,知道线程数目不超过corePoolSize,但是如果
调用了AllowCoreThreadTimeOut,keepAliveTime也会生效,当线程池线程数目不大于corePoolSize时,也会
终止空闲线程,直到线程数目为0
*/
this.keepAliveTime = unit.toNanos(keepAliveTime);
// 线程工厂, 主要用来创建线程
this.threadFactory = threadFactory;
/* 当拒绝任务时的处理策略,有以下四种取值
* AbortPolicy: 丢弃任务并抛出RejectedExecution
* DiscardPolicy: 丢弃任务,但是不抛出异常
* DiscardOldestPolicy: 丢弃队列最前面的任务
* CallerRunPolicy: 由调用线程处理该任务
* */
this.handler = handler;
}
从上面的uml图可以看到,ThreadPoolExecutor继承了AbstractExecutorService,我们来看下AbstractExecutorService的实现:
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}
AbstractExecutorService是一个抽象类,它实现了ExecutorService接口,再来看看Executor的接口。
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Executor继承了Executor接口:
public interface Executor {
void execute(Runnable command);
}
Executor是一个顶层接口,只声明了void execute(Runnable)方法, 可以理解为执行传进来的任务,ExecutorService接口继承了Executor接口,并声明了submit、invokeAll、shutDown等方法,抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法,而ThreadPoolExecutor继承了类AbstractExecutorService, 它有两个很重要的方法:
execute()
submit()
- execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体实现,通过该方法可以向线程池提交一个任务,交给线程池去执行。
- submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就有了具体实现,ThreadPoolExecutor并没有对其重写,这个方法也是用来向线程池提交任务的,并且它能够返回任务执行的结果,其实底层它也是调用execute()方法,只不过它利用了Future来获取任务执行结果。
2.2 线程池的实现原理
2.2.1 线程池的状态
在ThreadPoolExecutor中定义了一个volatile变量以及几个常量来表示线程的运行状态,
volatile int runState;
static final int RUNNING = 0; // 初始状态处于running
static final int SHUTDOWN = 1; // 调用了shutdown方法处于该状态,此时线程池不能接受新的任务,它会等待所有任务执行完毕
static final int STOP = 2; // 调用shutdownNow(),线程池处于stop状态,不能接受新的任务,并且会去尝试终止正在执行的任务
static final int TERMINATED = 3; // 当状态为1或2,并且所有工作线程已经销毁,任务缓存队列已经清空,线程池被设置成terminated状态。
2.2.2 任务的执行过程
ThreadPoolExecutor的execute()方法,
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { // 如果线程池中当前线程数目大于核心池大小, 就会直接进入if语句模块, 如果小于核心池大小则调用addIfUnderCorePoolSize方法
if (runState == RUNNING && workQueue.offer(command)) { // 如果当前线程处于running状态,则将任务放入任务缓存队列
if (runState != RUNNING || poolSize == 0){ // 为了防止将任务放入缓存队列的同时,其他线程突然调用shutdown或者shutdownNow方法关闭了线程池
// 确保添加到缓存队列的任务能够得到处理
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
来看看两个关键的方法:addIfUnderCorePoolSize和addIfunderMaximumPoolsize:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 对线程池进行加锁访问
try {
if (poolSize < corePoolSize && runState == RUNNING) // 虽然execute()方法判断过,但之前的判断并没有加锁,可能会有并发线程导致poolsize发生了变化,在这里需要重新判断
t = addThread(firstTask); // 创建线程去执行firstTask任务
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}
上面方法的关键在addThread方法:
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w); // 创建一个线程,执行任务
if (t != null) {
w.thread = t; // 将创建的线程的引用赋值为w的成员变量
workers.add(w);
int nt = ++poolSize; //当前线程数加1
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}
在addThread方法中,首先用提交的任务创建了一个Worker对象,然后使用工厂创建了一个新的线程t,并将线程t的引用赋值给了worker对象的thread成员变量,我们来看下Worker类的实现:
private final class Worker implements Runnable {
private final ReentrantLock runLock = new ReentrantLock();
private Runnable firstTask;
volatile long completedTasks;
Thread thread;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
boolean isActive() {
return runLock.isLocked();
}
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
void interruptNow() {
thread.interrupt();
}
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
boolean ran = false;
beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据
//自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {// 首先执行传进来的task,然后不断通过getTask()去取新的任务来执行
runTask(task); // 执行任务
task = null;
}
} finally {
workerDone(this); //线程退出,进行相关清理工作
}
}
}
从run()方法可以看出,它首先执行的是通过构造器传进来的任务,在调用runTask()执行完firstTask之后便不断通过getTask()去领取新的任务来执行,我们来看看它是怎么领取的
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN) // 如果线程池状态为stop或者terminated,直接返回null
return null;
Runnable r;
if (state == SHUTDOWN)
r = workQueue.poll(); // 从任务缓存队列取任务
else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,
//则通过poll取任务,若等待一定的时间取不到任务,则返回null
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); // 线程空闲的时间达到keepAliveTime后 会被清理
else
r = workQueue.take(); //如果没有任务了 会阻塞在这
if (r != null)
return r;
if (workerCanExit()) { //如果没取到任务,即r为null,则判断当前的worker是否可以退出
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers(); //中断处于空闲状态的worker
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
如果获取的任务为null,则可以通过workerCanExit()方法来判断当前worker是否可以退出,
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
//如果runState大于等于STOP,或者任务缓存队列为空了
//或者 允许为核心池线程设置空闲存活时间并且线程池中的线程数目大于1
try {
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize));
} finally {
mainLock.unlock();
}
return canExit;
}
如果允许worker退出,则调用interruptWorkers()中断处于空闲状态的worker,
void interruptIdleWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) //实际上调用的是worker的interruptIfIdle()方法
w.interruptIfIdle();
} finally {
mainLock.unlock();
}
}
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) { //注意这里,是调用tryLock()来获取锁的,因为如果当前worker正在执行任务,锁已经被获取了,是无法获取到锁的
//如果成功获取了锁,说明当前worker处于空闲状态
try {
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
这里通过直接让执行完任务的线程去任务缓存队列中去获取任务来执行的设计很巧妙,而不是通常的专门需要个任务分派线程,当发现有线程空闲时,任务分派线程就从任务缓存队列中取一个任务交给空闲线程执行,这样会额外需要对任务分派线程进行管理,而jdk线程池的这种让任务执行线程自己去取的方式巧妙地避免了这一问题。
到此,我们先总结一下线程池的大概流程:
- 当一个任务提交给线程池之后,如果当前线程池中的线程数目<corePoolsize,则对新提交的任务直接创建一个线程去执行,
- 如果当前线程数目>=corePoolSize,每来一个任务会尝试将其添加到任务缓存队列,若添加成功,该任务将会等待空闲线程将其取出去执行,若任务队列已满,添加失败,则会采取任务拒绝策略进行处理。
- 如果线程池的数量>corePoolSize,当有线程的空闲时间超过keepAliveTime,线程将会终止。
2.2.3 线程池中的线程初始化
默认情况下,创建线程池之后,线程池中是没有线程的,线程的创建是懒加载方式,需要提交任务时才会创建,如果想在创建线程池之后就立刻又线程,可以调用jdk提供了这两个方法:
- prestartCoreThread() 初始化一个核心线程
- prestartAllCoreThread() 初始化所有的核心线程
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null); //注意传进去的参数是null
}
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null)) //注意传进去的参数是null, 会让线程阻塞在getTask方法里的workQueue.take().
++n;
return n;
}
2.2.4 任务缓存队列及排队策略
任务缓存队列用来存放等待执行的任务,workQueue的类型为BlockingQueue<Runnable>,一般使用下面三种类型的Queue
- ArrayBlockingQueue: 基于数组的先进先出队列,创建时必须指定大小
- LinkedBlockingQueue: 基于链表的先进先出队列,默认大小为Interger.MAX_VALUE;
- SynchronousQueue: 当客户端提交一个任务时,线程池中又没有空闲的线程能够从SynchronousQueue 队列中取任务,那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。此时,ThreadPoolExecutor会新建一个新的工作者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小)。
3. 使用示例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
for(int i=0;i<15;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在执行task "+taskNum);
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task "+taskNum+"执行完毕");
}
}
执行结果:
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 2
正在执行task 3
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 10
正在执行task 11
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 12
正在执行task 13
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
正在执行task 14
task 0执行完毕
task 2执行完毕
正在执行task 5
task 1执行完毕
task 4执行完毕
task 10执行完毕
task 3执行完毕
正在执行task 6
task 11执行完毕
正在执行task 9
正在执行task 8
正在执行task 7
task 12执行完毕
task 14执行完毕
task 13执行完毕
task 5执行完毕
task 6执行完毕
task 9执行完毕
task 8执行完毕
task 7执行完毕
从执行结果可以看出,当线程池中线程数目大于5,再提交的任务就放入任务缓存队列,当任务缓存队列慢了之后,便创建新的线程,如果for循环改成执行20个任务,新创建的线程数目达到线程池最大数目时,就会抛出任务拒绝异常。
不过在使用线程时,java doc不推荐我们直接使用ThreadPoolExecutor,因为配置参数较多,不是很友好,而是提倡我们使用Executors的几个静态方法来创建线程池,这些静态方法对ThreadPoolExecutor进行了很好的封装,使用起来更友好,当然如果这些api都达不到要求,可以自己继承ThreadPoolExecutor进行重写。
Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor(); //创建容量为1的缓冲池
Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
// 注意核心池大小为0, 来了任务就创建线程, 当线程空闲超过60s 就销毁线程
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
网友评论