通过new Thread来创建一个线程,由于线程的创建和销毁都需要消耗一定的CPU资源,所以在高并发下这种创建线程的方式将严重影响代码执行效率。而线程池的作用就是让一个线程执行结束后不马上销毁,继续执行新的任务,这样就节省了不断创建线程和销毁线程的开销。
- ThreadPoolExecutor
- 关闭线程池
- 4种拒绝策略
3.1 CallerRunsPolicy
3.2 AbortPolicy
3.3 DiscardOldestPolicy
3.4 DiscardPolicy - 线程池的工厂方法
4.1 newFixedThreadPool
4.2 newCachedThreadPool
4.3 newSingleThreadPool
4.4 newScheduleThreadPool - ThreadPoolExecutor 一些API的用法
ThreadPoolExecutor
创建Java线程池最核心的类 ThreadPoolExecutor:
image.png
它提供了四种构造函数,最核心的构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
这7个参数的含义如下:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
- corePoolSize 线程池核心线程数。即线程池中保留的线程个数,即使这些线程是空闲的,也不会被销毁,除非通过ThreadPoolExecutor的allowCoreThreadTimeOut(true)方法开启了核心线程的超时策略;
- maximumPoolSize 线程池中允许的最大线程个数;
- keepAliveTime 用于设置那些超出核心线程数量的线程的最大等待时间,超过这个时间还没有新任务的话,超出的线程将被销毁;
- unit 超时时间单位;
- workQueue 线程队列。用于保存通过execute方法提交的,等待被执行的任务;
- threadFactory 线程创建工程,即指定怎样创建线程;
- handler 拒绝策略。即指定当线程提交的数量超出了maximumPoolSize后,该使用什么策略处理超出的线程。
下面看个例子,加入理解:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Thread::new,
new ThreadPoolExecutor.AbortPolicy());
System.out.println("=== 线程池创建完毕");
int activeCount = -1;
int queueSize = -1;
while (true) {
if (activeCount != threadPoolExecutor.getActiveCount()
|| queueSize != threadPoolExecutor.getQueue().size()) {
System.out.println("=== 活跃线程个数: " + threadPoolExecutor.getActiveCount());
System.out.println("=== 核心线程个数: " + threadPoolExecutor.getCorePoolSize());
System.out.println("=== 队列线程个数: " + threadPoolExecutor.getQueue().size());
System.out.println("=== 最大线程数 : " + threadPoolExecutor.getMaximumPoolSize());
System.out.println("-------------------------------------");
activeCount = threadPoolExecutor.getActiveCount();
queueSize = threadPoolExecutor.getQueue().size();
}
}
}
上面的代码创建了一个核心线程数量为1,允许最大线程数量为2,最大活跃时间为10秒,线程队列长度为1的线程池。
我们通过execute方法向线程池提交一个任务。
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Thread::new,
new ThreadPoolExecutor.AbortPolicy());
System.out.println("=== 线程池创建完毕");
threadPoolExecutor.execute(() -> sleep(10));
int activeCount = -1;
int queueSize = -1;
while (true) {
if (activeCount != threadPoolExecutor.getActiveCount()
|| queueSize != threadPoolExecutor.getQueue().size()) {
System.out.println("=== 活跃线程个数: " + threadPoolExecutor.getActiveCount());
System.out.println("=== 核心线程个数: " + threadPoolExecutor.getCorePoolSize());
System.out.println("=== 队列线程个数: " + threadPoolExecutor.getQueue().size());
System.out.println("=== 最大线程数 : " + threadPoolExecutor.getMaximumPoolSize());
System.out.println("-------------------------------------");
activeCount = threadPoolExecutor.getActiveCount();
queueSize = threadPoolExecutor.getQueue().size();
}
}
}
private static void sleep(long value) {
try {
System.out.println("=== " + Thread.currentThread().getName() + "线程执行sleep方法");
TimeUnit.SECONDS.sleep(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
ThreadPoolExecutor的execute和submit方法都可以向线程池提交任务,区别是,submit方法能够返回执行结果,返回值类型为Future
启动线程,控制台如下输出:
=== 线程池创建完毕
=== 活跃线程个数: 1
=== Thread-0线程执行sleep方法
=== 核心线程个数: 1
=== 队列线程个数: 0
=== 最大线程数 : 2
-------------------------------------
10秒后,控制台如下输出:
=== 线程池创建完毕
=== 活跃线程个数: 1
=== Thread-0线程执行sleep方法
=== 核心线程个数: 1
=== 队列线程个数: 0
=== 最大线程数 : 2
-------------------------------------
=== 活跃线程个数: 0
=== 核心线程个数: 1
=== 队列线程个数: 0
=== 最大线程数 : 2
-------------------------------------
线程核心线程数量为1,通过execute提交一个任务之后,由于核心线程是空的,所以任务被执行了,这个任务的逻辑是休眠10秒,所以这10秒内,线程池的活跃线程数量为1。此外,并没有线程需要放到线程队列里等待,线程队列长度为0,
10秒后任务执行完成,活跃线程个数变为0。
我们通过execute方法向线程池提交2个任务,看看结果如何:
threadPoolExecutor.execute(() -> sleep(10));
threadPoolExecutor.execute(() -> sleep(10));
启动线程,控制结果如下:
=== 线程池创建完毕
=== Thread-0线程执行sleep方法
=== 活跃线程个数: 1
=== 核心线程个数: 1
=== 队列线程个数: 1
=== 最大线程数 : 2
-------------------------------------
=== Thread-0线程执行sleep方法
=== 活跃线程个数: 1
=== 核心线程个数: 1
=== 队列线程个数: 0
=== 最大线程数 : 2
-------------------------------------
=== 活跃线程个数: 0
=== 核心线程个数: 1
=== 队列线程个数: 0
=== 最大线程数 : 2
-------------------------------------
活跃个数为1,队列线程数为1,
10秒后,
活跃个数为1,队列线程数为0,
10秒后,
活跃个数为0,队列线程数为0。
我们通过execute方法向线程池提交3个任务,结果:
threadPoolExecutor.execute(() -> sleep(10));
threadPoolExecutor.execute(() -> sleep(10));
threadPoolExecutor.execute(() -> sleep(10));
启动线程,控制台结果如下:
=== 线程池创建完毕
=== Thread-0线程执行sleep方法
=== Thread-1线程执行sleep方法
=== 活跃线程个数: 2
=== 核心线程个数: 1
=== 队列线程个数: 1
=== 最大线程数 : 2
-------------------------------------
=== Thread-1线程执行sleep方法
=== 活跃线程个数: 1
=== 核心线程个数: 1
=== 队列线程个数: 0
=== 最大线程数 : 2
-------------------------------------
=== 活跃线程个数: 0
=== 核心线程个数: 1
=== 队列线程个数: 0
=== 最大线程数 : 2
-------------------------------------
最大线程数为2,提交了3个任务,队列中也有等待任务,所以看到
活跃线程个数2,
队列线程个数1,
10秒后,
活跃线程个数1,
队列线程个数0,
10秒后,
活跃线程个数0,
队列线程个数1。
我们通过execute方法向线程池提交4个任务:
threadPoolExecutor.execute(() -> sleep(10));
threadPoolExecutor.execute(() -> sleep(10));
threadPoolExecutor.execute(() -> sleep(10));
threadPoolExecutor.execute(() -> sleep(10));
启动线程,控制台结果如下:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task cc.mrbird.demo.MainDemo$$Lambda$5/159413332@3d494fbf rejected from java.util.concurrent.ThreadPoolExecutor@1ddc4ec2[Running, pool size = 2, active threads = 2, queued tasks = 1, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at cc.mrbird.demo.MainDemo.main(MainDemo.java:24)
=== 线程池创建完毕
=== Thread-0线程执行sleep方法
=== Thread-1线程执行sleep方法
=== Thread-0线程执行sleep方法
因为我们设置的拒绝策略为AbortPolicy,所以最后提交的那个任务直接被拒绝了。更多拒绝策略下面会介绍到。
关闭线程池
当线程池中所有任务都处理完毕后,线程并不会自己关闭。我们可以通过调用shutdown和shutdownNow方法来关闭线程池。两者的区别在于:
shutdown: Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. Invocation has no additional effect if already shut down.
启动有序关闭,在此过程中执行先前提交的任务,但不接受任何新任务。如果已经关闭,调用不会产生额外的效果。这种方便平滑的关闭线程池。
shutdownNow :Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution. These tasks are drained (removed) from the task queue upon return from this method.
尝试停止所有正在积极执行的任务,停止处理等待的任务,并返回等待执行的任务列表。从该方法返回时,将从任务队列中删除这些任务。这种方法比较暴力。
通过代码去实践下:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
4,
10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Thread::new,
new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(new ShortTask());
threadPoolExecutor.execute(new LongTask());
threadPoolExecutor.execute(new ShortTask());
threadPoolExecutor.execute(new LongTask());
threadPoolExecutor.shutdown();
System.out.println("=== 已经执行了线程的shutdown方法");
}
static class ShortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("=== " + Thread.currentThread().getName() + " 执行shortTask完毕");
} catch (InterruptedException e) {
System.out.println("=== shortTask执行过程中被打断" + e.getMessage());
}
}
}
static class LongTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("=== " + Thread.currentThread().getName() + " 执行longTask完毕");
} catch (InterruptedException e) {
System.out.println("=== longTask执行过程中被打断" + e.getMessage());
}
}
}
执行结果:
=== 已经执行了线程的shutdown方法
=== Thread-0 执行shortTask完毕
=== Thread-0 执行shortTask完毕
=== Thread-1 执行longTask完毕
=== Thread-0 执行longTask完毕
可以看到,已经执行了shutdown()方法,并不会立即关闭线程池,而是等待所有被提交的任务都执行完了才关闭。
shutdownNow()例子:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
4,
10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Thread::new,
new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(new ShortTask());
threadPoolExecutor.execute(new LongTask());
threadPoolExecutor.execute(new ShortTask());
threadPoolExecutor.execute(new LongTask());
List<Runnable> runnables = threadPoolExecutor.shutdownNow();
System.out.println("** " + runnables);
System.out.println("=== 已经执行了线程的shutdown方法");
}
static class ShortTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("=== " + Thread.currentThread().getName() + " 执行shortTask完毕");
} catch (InterruptedException e) {
System.out.println("=== shortTask执行过程中被打断" + e.getMessage());
}
}
}
static class LongTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("=== " + Thread.currentThread().getName() + " 执行longTask完毕");
} catch (InterruptedException e) {
System.out.println("=== longTask执行过程中被打断" + e.getMessage());
}
}
}
执行结果:
=== longTask执行过程中被打断sleep interrupted
** [cc.mrbird.demo.MainDemo2$ShortTask@27bc2616, cc.mrbird.demo.MainDemo2$LongTask@3941a79c]
=== 已经执行了线程的shutdown方法
=== shortTask执行过程中被打断sleep interrupted
可以看到,在执行shutdownNow方法后,线程池马上被关闭,正在被执行的两个任务被打断,并且返回了线程队列中等待被执行的两个任务。
4种拒绝策略
当线程池无法接收新的任务的时候,可采取如下策略:
image.png
AbortPolicy策略: 丢弃任务,并抛出异常
上面那个例子就是AbortPolicy拒绝策略。
CallerRunsPolicy策略: 由调用线程处理该任务
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Thread::new,
new ThreadPoolExecutor.CallerRunsPolicy());
System.out.println("=== 线程池创建完毕");
threadPoolExecutor.execute(() -> sleep(5));
threadPoolExecutor.execute(() -> sleep(5));
threadPoolExecutor.execute(() -> sleep(5));
threadPoolExecutor.execute(() -> sleep(5));
}
private static void sleep(long value) {
try {
System.out.println("=== " + Thread.currentThread().getName() + "线程执行sleep方法");
TimeUnit.SECONDS.sleep(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
执行结果:
=== 线程池创建完毕
=== Thread-0线程执行sleep方法
=== main线程执行sleep方法
=== Thread-1线程执行sleep方法
=== Thread-1线程执行sleep方法
DiscardOldestPolicy策略:丢弃最早被放入到线程队列的任务,将新提交的任务放入到线程队列末端
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Thread::new,
new ThreadPoolExecutor.DiscardOldestPolicy());
System.out.println("=== 线程池创建完毕");
threadPoolExecutor.execute(() -> sleep(5,"任务一"));
threadPoolExecutor.execute(() -> sleep(5,"任务二"));
threadPoolExecutor.execute(() -> sleep(5,"任务三"));
threadPoolExecutor.execute(() -> sleep(5,"任务四"));
}
private static void sleep(long value,String name) {
try {
System.out.println("=== " + Thread.currentThread().getName() + "线程执行sleep方法 "+name);
TimeUnit.SECONDS.sleep(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
执行结果:
=== 线程池创建完毕
=== Thread-0线程执行sleep方法 任务一
=== Thread-1线程执行sleep方法 任务三
=== Thread-0线程执行sleep方法 任务四
DiscardPolicy策略:直接丢弃新的任务,不抛异常
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Thread::new,
new ThreadPoolExecutor.DiscardPolicy());
System.out.println("=== 线程池创建完毕");
threadPoolExecutor.execute(() -> sleep(5,"任务一"));
threadPoolExecutor.execute(() -> sleep(5,"任务二"));
threadPoolExecutor.execute(() -> sleep(5,"任务三"));
threadPoolExecutor.execute(() -> sleep(5,"任务四"));
}
private static void sleep(long value,String name) {
try {
System.out.println("=== " + Thread.currentThread().getName() + "线程执行sleep方法 "+name);
TimeUnit.SECONDS.sleep(value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
执行结果:
=== 线程池创建完毕
=== Thread-0线程执行sleep方法 任务一
=== Thread-1线程执行sleep方法 任务三
=== Thread-0线程执行sleep方法 任务二
线程池的工厂方法
除了使用ThreadPoolExecutor的构造方法创建线程池外,我们也可以使用Executors提供的工厂方法来创建不同类型的线程池:
ExecutorService executorService01 = Executors.newFixedThreadPool(5);
ExecutorService executorService02 = Executors.newCachedThreadPool();
ExecutorService executorService03 = Executors.newSingleThreadExecutor();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
newFixedThreadPool
查看newFixedThreadPool方法源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可以看到,通过newFixedThreadPool创建的是一个固定大小的线程池,大小由nThreads参数指定,它具有如下几个特点:
-
因为corePoolSize和maximumPoolSize的值都为nThreads,所以线程池中线程数量永远等于nThreads,不可能新建除了核心线程数的线程来处理任务,即keepAliveTime实际上在这里是无效的。
-
LinkedBlockingQueue是一个无界队列(最大长度为Integer.MAX_VALUE),所以这个线程池理论是可以无限的接收新的任务,这就是为什么上面没有指定拒绝策略的原因。
newCachedThreadPool
查看newCachedThreadPool方法源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
这是一个理论上无限大小的线程池:
-
核心线程数为0,SynchronousQueue队列是没有长度的队列,所以当有新的任务提交,如果有空闲的还未超时的(最大空闲时间60秒)线程则执行该任务,否则新增一个线程来处理该任务。
-
因为线程数量没有限制,理论上可以接收无限个新任务,所以这里也没有指定拒绝策略。
newSingleThreadExecutor
查看newSingleThreadExecutor源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 核心线程数和最大线程数都为1,每次只能有一个线程处理任务。
- LinkedBlockingQueue队列可以接收无限个新任务。
newScheduledThreadPool
查看newScheduledThreadPool源码:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
......
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
所以newScheduledThreadPool理论是也是可以接收无限个任务,DelayedWorkQueue也是一个无界队列。
使用newScheduledThreadPool创建的线程池除了可以处理普通的Runnable任务外,它还具有调度的功能:
- 延时指定时间后执行:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 延时5秒执行
executorService.schedule(() -> System.out.println("== Hello"), 5, TimeUnit.SECONDS);
2.按指定速率执行:
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 按指定速率执行
executorService.scheduleAtFixedRate(()->System.out.println(LocalDateTime.now()),3,5,TimeUnit.SECONDS);
执行结果:
2019-07-08T11:38:20.792
2019-07-08T11:38:25.776
2019-07-08T11:38:30.775
2019-07-08T11:38:35.773
2019-07-08T11:38:40.773
2019-07-08T11:38:45.774
- 按指定时延执行
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
// 按指定时延执行
executorService.scheduleWithFixedDelay(()->System.out.println(LocalDateTime.now()),3,5,TimeUnit.SECONDS);
执行结果:
2019-07-08T11:41:35.225
2019-07-08T11:41:40.228
2019-07-08T11:41:45.228
2019-07-08T11:41:50.230
2019-07-08T11:41:55.231
2019-07-08T11:42:00.232
乍一看,scheduleAtFixedRate和scheduleWithFixedDelay没啥区别,实际它们还是有区别的:
- scheduleAtFixedRate按照固定速率执行任务,比如每5秒执行一个任务,即使上一个任务没有结束,5秒后也会开始处理新的任务;
- scheduleWithFixedDelay按照固定的时延处理任务,比如每延迟5秒执行一个任务,无论上一个任务处理了1秒,1分钟还是1小时,下一个任务总是在上一个任务执行完毕后5秒钟后开始执行。
对于这些线程池工厂方法的使用,阿里巴巴编程规程指出:
image.png
因为这几个线程池理论是都可以接收无限个任务,所以这就有内存溢出的风险。实际上只要我们掌握了ThreadPoolExecutor构造函数7个参数的含义,我们就可以根据不同的业务来创建出符合需求的线程池。一般线程池的创建可以参考如下规则:
- IO密集型任务,线程池线程数量可以设置为2 X CPU核心数;
- 计算密集型任务,线程池线程数量可以设置为CPU核心数 + 1。
一些API的用法
ThreadPoolExecutor提供了几个判断线程池状态的方法:
public static void main(String[] args) throws InterruptedException{
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Thread::new, new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threadPoolExecutor.shutdown();
System.out.println("=== 线程为shutdown状态: " + threadPoolExecutor.isShutdown());
System.out.println("=== 线程池正在关闭: " + threadPoolExecutor.isTerminating());
System.out.println("=== 线程池已经关闭: " + threadPoolExecutor.isTerminated());
// 阻塞当前线程,等待线程池关闭,timeout用于指定等待时间。
threadPoolExecutor.awaitTermination(56,TimeUnit.SECONDS);
System.out.println("=== 线程池已经关闭: " + threadPoolExecutor.isTerminated());
}
程序输出如下:
=== 线程为shutdown状态: true
=== 线程池正在关闭: true
=== 线程池已经关闭: false
=== 线程池已经关闭: true
前面我们提到,线程池核心线程即使是空闲状态也不会被销毁,除非使用allowCoreThreadTimeOut设置了允许核心线程超时:
public static void main(String[] args) throws InterruptedException{
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Thread::new, new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.allowCoreThreadTimeOut(true);
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("=== 任务执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
5秒后任务执行完毕,核心线程处于空闲的状态。因为通过allowCoreThreadTimeOut方法设置了允许核心线程超时,所以3秒后(keepAliveTime设置为3秒),核心线程被销毁。核心线程被销毁后,线程池也就没有作用了,于是就自动关闭了。
值得注意的是,如果一个线程池调用了allowCoreThreadTimeOut(true)方法,那么它的keepAliveTime不能为0。
ThreadPoolExecutor提供了一remove方法,查看其源码:
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
可看到,它删除的是线程队列中的任务,而非正在被执行的任务。举个例子:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, 2, 3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Thread::new, new ThreadPoolExecutor.AbortPolicy());
threadPoolExecutor.allowCoreThreadTimeOut(true);
threadPoolExecutor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("=== 任务执行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Runnable runnable = () -> System.out.println("=== 看看我是否被删除");
threadPoolExecutor.execute(runnable);
threadPoolExecutor.remove(runnable);
threadPoolExecutor.shutdown();
}
程序输出:
image.png
可看到任务并没有被执行,已经被删除,因为唯一一个核心线程已经在执行任务了,所以后提交的这个任务被放到了线程队列里,然后通过remove方法删除。
默认情况下,只有当往线程池里提交了任务后,线程池才会启动核心线程处理任务。我们可以通过调用prestartCoreThread方法,让核心线程即使没有任务提交,也处于等待执行任务的活跃状态:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 2, 3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Thread::new, new ThreadPoolExecutor.AbortPolicy());
System.out.println("=== 活跃线程个数: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("=== 活跃线程个数: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("=== 活跃线程个数: " + threadPoolExecutor.getActiveCount());
threadPoolExecutor.prestartCoreThread();
System.out.println("=== 活跃线程个数: " + threadPoolExecutor.getActiveCount());
}
程序输出:
···
=== 活跃线程个数: 0
=== 活跃线程个数: 1
=== 活跃线程个数: 2
=== 活跃线程个数: 2
···
该方法返回boolean类型值,如果所以核心线程都启动了,返回false,反之返回true。
还有一个和它类似的prestartAllCoreThreads方法,它的作用是一次性启动所有核心线程,让其处于活跃地等待执行任务的状态。
ThreadPoolExecutor的invokeAny方法用于随机执行任务集合中的某个任务,并返回执行结果,该方法是同步方法:
public static void main(String[] args) throws Exception{
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 5, 3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Thread::new, new ThreadPoolExecutor.AbortPolicy());
// 任务集合
List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
return i;
}).collect(Collectors.toList());
// 随机执行结果
Integer result = threadPoolExecutor.invokeAny(tasks);
System.out.println("------------------");
System.out.println(result);
threadPoolExecutor.shutdown();
}
程序输出:
------------------
0
ThreadPoolExecutor的invokeAll则是执行任务集合中的所有任务,返回Future集合:
public static void main(String[] args) throws Exception {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2, 5, 3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1),
Thread::new, new ThreadPoolExecutor.AbortPolicy());
// 任务集合
List<Callable<Integer>> tasks = IntStream.range(0, 4).boxed().map(i -> (Callable<Integer>) () -> {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(5));
return i;
}).collect(Collectors.toList());
List<Future<Integer>> futures = threadPoolExecutor.invokeAll(tasks);
futures.stream().map(f -> {
try {
return f.get();
} catch (InterruptedException | ExecutionException e) {
return null;
}
}).forEach(System.out::println);
threadPoolExecutor.shutdownNow();
}
程序输出如下:
0
1
2
3
总结如下方法:
方法 | 描述 |
---|---|
allowCoreThreadTimeOut(boolean value) | 是否允许核心线程空闲后超时,是的话超时后核心线程将销毁,线程池自动关闭 |
awaitTermination(long timeout, TimeUnit unit) | 阻塞当前线程,等待线程池关闭,timeout用于指定等待时间。 |
execute(Runnable command) | 向线程池提交任务,没有返回值 |
submit(Runnable task) | 向线程池提交任务,返回Future |
isShutdown() | 判断线程池是否为shutdown状态 |
isTerminating() | 判断线程池是否正在关闭 |
isTerminated() | 判断线程池是否已经关闭 |
remove(Runnable task) | 移除线程队列中的指定任务 |
prestartCoreThread() | 提前让一个核心线程处于活跃状态,等待执行任务 |
prestartAllCoreThreads() | 提前让所有核心线程处于活跃状态,等待执行任务 |
getActiveCount() | 获取线程池活跃线程数 |
getCorePoolSize() | 获取线程池核心线程数 |
threadPoolExecutor.getQueue() | 获取线程池线程队列 |
getMaximumPoolSize() | 获取线程池最大线程数 |
shutdown() | 让线程池处于shutdown状态,不再接收任务,等待所有正在运行中的任务结束后,关闭线程池。 |
shutdownNow() | 让线程池处于stop状态,不再接受任务,尝试打断正在运行中的任务,并关闭线程池,返回线程队列中的任务。 |
网友评论