为什么要使用线程池
- 反复创建线程开销大
- 过多的线程会占用太多内存
线程池的好处
- 加快响应速度
- 合理利用CPU和内存
- 统一管理
线程池适合应用的场合
- 服务器接受到大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率
- 在开发中,如果需要创建5哥以上的线程,那么就可以使用线程池来管理
线程池构造函数的参数
参数名 | 类型 | 含义 |
---|---|---|
corePoolSize | int | 核心线程数 |
maximumPoolSize | int | 最大线程数 |
keepAliveTime | long | 保持存活时间 |
workQueue | BlockingQueue | 任务存储队列 |
threadFactory | ThreadFactory | 当线程池需要新的线程的时候,会使用threadFactory来生成新的线程 |
Handler | RejectedExecutionHandler | 由于线程池无法接受你所提交的任务的拒绝策略 |
corePoolSize
指的是核心线程数,线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务
maximumPoolSize
线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上限,这就是maximumPoolSize
keepAliveTime
如果线程池当前的线程数多于corePoolSize,那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止
ThreadFactory
新的线程都是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory,创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等。通常使用默认的ThreadFactory就可以了。
workQueue
有3种最常见的队列类型
1.直接交接:SynchronousQueue
2.无界队列:LinkedBlockingQueue
3.有界队列:ArrayBlockingQueue
线程池添加线程规则
- 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务
- 如果线程数等于(或大于)corePoolSize但少于maximumPoolSize,则将任务放入队列
- 如果队列已满,并且线程数小于maximumPoolSize,则创建一个新线程来运行任务
- 如果队列已满,并且线程数大于或等于maximumPoolSize,则拒绝该任务
增减线程的特点
- 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池;
- 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它;
- 通过设置maximumPoolSize为很高的值(例如Integer.MAX_VALUE),可以允许线程池容纳任意数量的并发任务;
- 是只有在队列填满时才创建多于corePoolSize的线程,所以如果你使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。
线程池应该手动创建还是自动创建
newFixedThreadPool:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 通过源码可以看出,newFixedThreadPool 使用的是 LinkedBlockingQueue,由于 LinkedBlockingQueue 是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,可能会导致OOM
newSingleThreadExecutor:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 通过源码可以看出,这里和上面的 newFixedThreadPool 的原理基本一样,只不过是把线程数直接设置成了1,所以这也会导致同样的问题,也就是当请求堆积的时候,可能会占用大量的内存。
newCachedThreadPool:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 可缓存线程池
- 特点:无界限线程池,具有自动回收多余线程的功能(默认时间是60秒)
- 弊端:在于第二个参数 maximumPoolSize 被设置为了Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致OOM。
newScheduledThreadPool:
- 支持定时及周期性任务执行的线程池
正确的创建线程池的方法
根据不同的业务场景,选择合适的方式,最后是我们自己手动创建线程池,自己设置线程池参数。
线程池里的线程数量设定为多少比较合适
- CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的 1-2 倍左右
- 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考 Brain Goetz 推荐的计算方法:
线程数=CPU核心数x(1+平均等待时间/平均工作时间)
停止线程池的正确方法
- shutdown
shutdown 并不是立即粗暴的结束线程,线程池仍然会继续执行已创建的任务,但是不会接收新的任务了。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ShutDownDemo {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
executorService.shutdown();
// 1500毫秒后此处会抛出异常 RejectedExecutionException
executorService.execute(new ShutDownTask());
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上述代码在1.5秒后执行 executorService.shutdown(); 之后,再执行
executorService.execute(new ShutDownTask()); 则会报错抛出异常 RejectedExecutionException
- isShutdown
返回true或false告诉我们线程是否已经停止
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
System.out.println(executorService.isShutdown()); // 打印false
executorService.shutdown();
System.out.println(executorService.isShutdown()); // 打印true
System.out.println(executorService.isTerminated());
}
- isTerminated
返回true或false告诉我们线程是否已经完全停止
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
executorService.shutdown();
// 打印false,因为线程还没有完全执行完
System.out.println(executorService.isTerminated());
}
- awaitTermination
awaitTermination 有三种情况会返回,没返回之前都是阻塞的
第一种情况:所有任务都执行完毕了
第二种情况:等待的时间到了
第三种情况:等待的过程中被打断了,会抛出 InterruptedException
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
boolean b = executorService.awaitTermination(3L, TimeUnit.SECONDS);
// 会打印false,因为3秒钟不够线程全部执行完
System.out.println(b);
}
- shutdownNow
正在执行任务的线程继续执行,等待队列中的线程直接结束并返回
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ShutDownDemo {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
// 等待1.5秒
Thread.sleep(1500);
// 这里会返回已经放到线程池队列中还没有执行的Runnable
List<Runnable> runnables = executorService.shutdownNow();
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
// 接收被中断信号
System.out.println(Thread.currentThread().getName() + "被中断了!");
}
}
}
executorService.shutdownNow();会返回已经放到线程池队列中还没有执行的Runnable
运行结果:
...
...
...
pool-1-thread-8
pool-1-thread-7
pool-1-thread-6
pool-1-thread-10
pool-1-thread-1
pool-1-thread-3
pool-1-thread-1被中断了!
pool-1-thread-4
pool-1-thread-2
pool-1-thread-7被中断了!
pool-1-thread-8被中断了!
pool-1-thread-10被中断了!
pool-1-thread-6被中断了!
pool-1-thread-9被中断了!
pool-1-thread-5被中断了!
Process finished with exit code 0
线程池任务太多,怎么拒绝
-
拒绝时机
- 当Executor关闭时,提交新任务会被拒绝
- 以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时
-
拒绝策略
- AbortPolicy,直接抛出一个异常
- DiscardPolicy,悄悄的把任务丢弃,没有通知
- DiscardOldestPolicy,把队列中最老的那个任务丢弃,新任务加进来
- CallerRunsPolicy,谁提交的任务谁去执行(比如说主线程给线程池提交了一个任务,但是线程池已经饱和无法再执行了,这时则会让提交任务的主线程去执行这个任务)
线程池钩子函数
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 演示每个任务执行前后放钩子函数
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
private boolean isPaused;
private final ReentrantLock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被执行了");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(2000);
// 等待2秒,执行暂停方法
pauseableThreadPool.pause();
System.out.println("线程池被暂停了!");
Thread.sleep(2000);
// 再等待2秒,执行恢复方法
pauseableThreadPool.resume();
System.out.println("线程池被恢复了!");
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 暂停线程
*/
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
/**
* 恢复线程
*/
private void resume() {
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
lock.unlock();
}
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
}
运行结果:
钩子函数执行效果
网友评论