Executors 创建线程池
- Executors.newCachedThreadPool:创建可缓存无限制数量的线程池,如果线程中没有空闲线程池的话此时再来任务会新建线程,如果超过60秒此线程无用,那么就会将此线程销毁。简单来说就是忙不来的时候无限制创建临时线程,闲下来再回收
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- Executors.newFixedThreadPool:创建固定大小的线程池,可控制线程最大的并发数,超出的线程会在队列中等待。简单来说就是忙不来的时候会将任务放到无限长度的队列里
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- Executors.newSingleThreadExecutor:创建线程池中线程数量为1的线程池,用唯一线程来执行任务,保证任务是按照指定顺序执行
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- Executors.newScheduledThreadPool:创建固定大小的线程池,支持定时及周期性的任务执行
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
- 其实这四种实现方式的源码中我们可以看到其实它们的底层创建原理都是一样的,只不过是所传的参数不同组成的四个不同类型的线程池。都是使用了ThreadPoolExecutor来创建的。我们可以看一下ThreadPoolExecutor创建所传的参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:线程池中核心线程数的数量
- maximumPoolSize:在线程池中允许存在的最大线程数
- keepAliveTime:当存在的线程数大于corePoolSize,那么会找到空闲线程去销毁,此参数是设置空闲多久的线程才被销毁。
- unit:时间单位
- workQueue:工作队列,线程池中的当前线程数大于核心线程的话,那么接下来的任务会放入到队列中
- threadFactory:在创建线程的时候,通过工厂模式来生产线程。这个参数就是设置我们自定义的线程创建工厂。
handler:如果超过了最大线程数,那么就会执行我们设置的拒绝策略
参数处理逻辑
前 corePoolSize 个任务时,来一个任务就创建一个线程,如果当前线程池的线程数大于了 corePoolSize 那么下来再来的任务就会放入到我们上面设置的 workQueue 队列中,如果此时 workQueue 也满了,那么再来任务时,就会新建临时线程,那么此时如果我们设置了 keepAliveTime 或者设置了 allowCoreThreadTimeOut ,那么系统就会进行线程的活性检查,一旦超时便销毁线程,如果此时线程池中的当前线程大于了 maximumPoolSize 最大线程数,那么就会执行我们刚才设置的 handler 拒绝策略
but... 阿里的 Java开发手册建议
【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,
这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
阿里规约之所以强制要求手动创建线程池,也是和这些参数有关。具体为什么不允许,
规约是这么说的:
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors各个方法的弊端:
- FixedThreadPool 和 SingleThreadExecutor:这两个线程池的实现方式,我们可以看到它设置的工作队列都是 LinkedBlockingQueue,我们知道此队列是一个链表形式的队列,此队列是没有长度限制的,是一个无界队列,那么此时如果有大量请求,就有可能造成OOM
- CachedThreadPool 和 ScheduledThreadPool:这两个线程池的实现方式,我们可以看到它设置的最大线程数都是Integer.MAX_VALUE,那么就相当于允许创建的线程数量为Integer.MAX_VALUE。此时如果有大量请求来的时候也有可能造成OOM
如何设置参数
- 项目中如果要使用线程池的话,那么就推荐根据自己项目和机器的情况进行个性化创建线程池。那么这些参数如何设置呢?为了正确的定制线程池的长度,需要理解你的计算机配置、所需资源的情况以及任务的特性。比如部署的计算机安装了多少个CPU?多少的内存?任务主要执行是IO密集型还是CPU密集型?所执行任务是否需要数据库连接这样的稀缺资源?
如果你有多个不同类别的任务,它们的行为有很大的差别,那么应该考虑使用多个线程池。这样也能根据每个任务不同定制不同的线程池,也不至于因为一种类型的任务失败而托垮另一个任务。
- CPU密集型任务:说明包含了大量的运算操作,比如有N个CPU,那么就配置线程池的容量大小为N+1,这样能获得最优的利用率。因为CPU密集型的线程恰好在某时因为发生一个页错误或者因为其他的原因而暂停,刚好有一个额外的线程,可以确保在这种情况下CPU周期不会中断工作。
- IO密集任务:说明CPU大部分时间都是在等待IO的阻塞操作,那么此时就可以将线程池的容量大小配置的大一些。此时可以根据一些参数进行计算大概你的线程池的数量多少合适。
- N:CPU的数量
- U:目标CPU的使用率,0<=U<=1
- W/C:等待时间与计算时间的比率
- 那么最优的池的大小就是NU(1+W/C)
- 例如:线程数量=cpu的数量cpu期望利用率(1 + 任务等待时间/任务处理时间)。
比如一个8核CPU,希望这部分工作的CPU使用率20%,任务等待时间允许200ms,每个任务执行10ms。
那么线程数量=8*0.2*(1+200/10)= 33
创建线程的正确方式
public class NewCachedThreadPoolDemo implements Runnable{
public static void main(String[] args) {
/**
* corePoolSize:核心线程数;
* maximumPoolSize:最大线程数,即线程池中允许存在的最大线程数;
* keepAliveTime:线程存活时间,对于超过核心线程数的线程,当线程处理空闲状态下,且维持时间达到keepAliveTime时,线程将被销毁;
* unit:keepAliveTime的时间单位
* workQueue:工作队列,用于存在待执行的线程任务;
* threadFactory:创建线程的工厂,用于标记区分不同线程池所创建出来的线程;
* handler:当到达线程数上限或工作队列已满时的拒绝处理逻辑;
*/
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
100,
3L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>());
try {
for (int i = 0; i < 20; i++) {
executor.execute(new NewCachedThreadPoolDemo());
}
}finally {
// 关闭线程池
executor.shutdown();
}
}
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"调用 run 方法");
}
}
线程池的拒绝策略
- ThreadPoolExecutor为我们提供了四种拒绝策略,我们可以看下Java提供的四种线程池创建所提供的拒绝策略都是其定义的默认的拒绝策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
- 我们可以到拒绝策略是一个接口RejectedExecutionHandler,这也就意味我着我们可以自己订自己的拒绝策略,我们先看一下Java提供四种拒绝策略是什么
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
- AbortPolicy
这个拒绝策略就是Java提供的四种线程池创建方法提供的默认拒绝策略。我们可以看下它的实现
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
所以此拒绝策略就是抛RejectedExecutionException异常
- CallerRunsPolicy
此拒绝策略简单来说就是将此任务交给调用者直接执行
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
这里为什么是交给了调用者来执行呢?我们可以看到它是调用了run()方法,而不是start()方法
- DiscardOldestPolicy
从源码中应该能看出来,此拒绝策略是丢弃队列中最老的任务,然后再执行
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
- DiscardPolicy
从源码中应该能看出来,此拒绝策略是对于当前任务不做任何操作,简单来说就是直接丢弃了当前任务不执行
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
线程池的拒绝策略给我们默认提供了这四种的实现方式,当然我们也能够自定义拒绝策略使线程池更加符合我们当前的业务
springboot 中使用线程池
配置线程池
@Configuration
@EnableAsync
public class ThreadConfig {
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
// 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
executor.setCorePoolSize(5);
// 线程池维护线程的最大数量
// 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
// 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
executor.setMaxPoolSize(10);
// 缓存队列(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行
executor.setQueueCapacity(20);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(60);
// 设置默认线程名称
executor.setThreadNamePrefix("taskThread-");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 初始化
executor.initialize();
return executor;
}
}
使用线程池
@Service
public class ThreadService {
@Async
public void run() {
System.out.println(Thread.currentThread().getName()+"调用 run 方法");
}
}
调用
@SpringBootApplication
@Controller
public class ApplicationaStart {
public static void main(String[] args) {
SpringApplication.run(ApplicationaStart.class);
}
@Autowired
private ThreadService t;
@RequestMapping("/test")
@ResponseBody
public Object asyncTest(){
t.run();
System.out.println("哈哈...");
return "hello";
}
}
结果
哈哈...
taskThread-1调用 run 方法
可以看到 哈哈... 先执行了
注意事项
如下方式会使@Async失效
- 异步方法使用static修饰
- 异步类没有使用@Component注解(或其他注解)导致spring无法扫描到异步类
- 异步方法不能与被调用的异步方法在同一个类中
- 类中需要使用@Autowired或@Resource等注解自动注入,不能自己手动new对象
- 如果使用SpringBoot框架必须在启动类中增加@EnableAsync注解
网友评论