ThreadPoolExecutor
public class ThreadPoolExecutor extends AbstractExecutorService {
...
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
...
}
ThreadPoolExecutor 四个构造器.png
看源码可知前三个构造器最终调用的都是第四个进行初始化工作。
workQueue
等待队列
- ArrayBlockingQueue 使用较少。必须指定
capacity
,即有界队列 - PriorityBlockingQueue 使用较少。默认大小
DEFAULT_INITIAL_CAPACITY = 11
,最大MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8
,即无界有序队列 - LinkedBlockingQueue 默认大小
Integer.MAX_VALUE
,即无界队列 - SynchronousQueue 内部并没有数据缓存空间,一旦有了插入线程和移除线程,元素很快就从插入线程移交给移除线程(
快速传递元素的方式
),在多任务队列中是最快处理任务的方式,元素总是以最快的方式从生产者传递给消费者。典型应用是Executors.newCachedThreadPool(),这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。 - 线程池的排队策略与所选的 BlockingQueue 有关。
handler
拒绝处理任务时使用的策略
- ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常,会阻止正常工作。
- ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常,系统正常工作。
- ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务(即将被执行的),然后重新尝试提交当前任务(重复此过程)
- ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务
核心方法:
execute()
是在Executor接口中的声明,通过这个方法可以向线程池提交一个任务,交由线程池去执行
submit()
是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果
shutdown()
不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow()
立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
线程池状态:
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
/**
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* STOP -> TIDYING
* When pool is empty
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
**/
- 创建线程池后,初始时,线程池处于RUNNING状态;
- 调用 shutdown(),则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
- 调用 shutdownNow(),则线程池处于STOP状态,此时线程池不能接受新的任务,并且尝试终止正在执行的任务;
- 当线程池处于SHUTDOWN或STOP状态,队列和线程池中都为空的情况,即所有任务都已被终止,workerCount 标记为 0,则处于TIDYING状态
- 当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁(处于TIDYING状态),任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
任务的执行相关重要参数
// 任务缓存队列,用来存放等待执行的任务
private final BlockingQueue<Runnable> workQueue;
// 线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
private final ReentrantLock mainLock = new ReentrantLock();
// Accessed only under mainLock.
// 用来存放工作集
private final HashSet<Worker> workers = new HashSet<Worker>();
// 用来记录线程池中曾经出现过的最大线程数
private int largestPoolSize;
// 用来记录已经执行完毕的任务个数
private long completedTaskCount;
// 线程工厂类,用来创建线程
private volatile ThreadFactory threadFactory;
// 任务拒绝策略
private volatile RejectedExecutionHandler handler;
// 线程存活时间
private volatile long keepAliveTime;
// 是否允许为核心线程设置存活时间
private volatile boolean allowCoreThreadTimeOut;
// 核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int corePoolSize;
// //线程池最大能容忍的线程数. Note that the actual maximum is internally bounded by CAPACITY.
private volatile int maximumPoolSize;
// 默认的任务拒绝策略:丢弃任务并抛出RejectedExecutionException异常
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
corePoolSize
是正常情况下线程池大小,maximumPoolSize
是线程池的一种额外Support,即任务量突然过大时的额外可支持的开销
largestPoolSize
只是用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。
execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/** 1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task.
* The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn't, by returning false.
*/
// 线程池中正在作业的线程数 < corePoolSize数
if (workerCountOf(c) < corePoolSize) {
// 是否可以继续向 corePool 中新增线程
if (addWorker(command, true))
return;
// 再次获取当前线程池状态值
c = ctl.get();
}
/** 2. If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method.
* So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none.
*/
// 线程池为可运行状态,同时Runnable command可以加入等待队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 非Running状态,则从队列中去掉command并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
/** 3. If we cannot queue task, then we try to add a new thread.
* If it fails, we know we are shut down or saturated and so reject the task.
*/
// 新增thread
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 不可以在拓展的线程池中运行该实例,执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
使用 ThreadPoolExecutor 创建线程池:
public class HelloThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2,
100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2));
IntStream.range(0, 4).mapToObj(PrintTask::new).forEach(printTask -> {
executor.execute(printTask);
System.out.println("线程池中所有线程数目:" + executor.getPoolSize() + ",队列中待执行的任务数目:" +
executor.getQueue().size() + ",已执行完的任务数目:" + executor.getCompletedTaskCount());
});
executor.shutdown();
}
}
class PrintTask implements Runnable {
private int taskIndex;
PrintTask(int index) {
this.taskIndex = index;
}
@Override
public void run() {
System.out.println(taskIndex + " is running...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(taskIndex + " end");
}
}
output:
0 is running...
线程池中所有线程数目:1,队列中待执行的任务数目:0,已执行完的任务数目:0
线程池中所有线程数目:1,队列中待执行的任务数目:1,已执行完的任务数目:0
线程池中所有线程数目:1,队列中待执行的任务数目:2,已执行完的任务数目:0
线程池中所有线程数目:2,队列中待执行的任务数目:2,已执行完的任务数目:0
3 is running...
3 end
0 end
1 is running...
2 is running...
2 end
1 end
当把要执行的实例变成 5 个就会出现 RejectedExecutionException
异常:
java.util.concurrent.RejectedExecutionException:
Task threadPool.PrintTask@7699a589 rejected from
java.util.concurrent.ThreadPoolExecutor@58372a00[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
java doc中,并不提倡我们直接使用 ThreadPoolExecutor ,而是使用 Executors 类中提供的几个静态方法来创建线程池:
-
Executors.newCachedThreadPool()
若线程池的当前规模超过了`corePoolSize`,就会回收部分空闲的线程(根据`keepAliveTime`来回收),当需求增加时,线程池又可以智能的添加新线程来处理任务。此线程池大小`Integer.MAX_VALUE`可以认为是不做限制(使用队列`SynchronousQueue`),线程池大小完全依赖于JVM能够创建的最大线程大小 -
Executors.newSingleThreadExecutor()
创建容量为1的线程池,`corePoolSize`和`maximumPoolSize`均为1,使用无界队列`LinkedBlockingQueue` -
Executors.newFixedThreadPool(int)
创建容量为固定个数n的线程池。`corePoolSize`和`maximumPoolSize`均为n,使用无界队列`LinkedBlockingQueue` -
Executors.newScheduledThreadPool(int)
创建一个指定corePoolSize的线程池,支持定时及周期性任务执行 -
如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
网友评论