-
corePoolSize(核心线程数)
- 核心线程会一直存在,即使没有任务执行;
- 当线程数小于核心线程数的时候,即使有空闲线程,也会一直创建线程直到达到核心线程数;
- 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭。
-
queueCapacity(任务队列容量)
- 也叫阻塞队列,当核心线程都在运行,此时再有任务进来,会进入任务队列,排队等待线程执行。
-
maxPoolSize(最大线程数)
- 线程池里允许存在的最大线程数量;
- 当任务队列已满,且线程数量大于等于核心线程数时,会创建新的线程执行任务;
- 线程池里允许存在的最大线程数量。当任务队列已满,且线程数量大于等于核心线程数时,会创建新的线程执行任务。
-
keepAliveTime(线程空闲时间)
- 当线程空闲时间达到keepAliveTime时,线程会退出(关闭),直到线程数等于核心线程数;
- 如果设置了allowCoreThreadTimeout=true,则线程会退出直到线程数等于零。
- allowCoreThreadTimeout(允许核心线程超时)
- ejectedExecutionHandler(任务拒绝处理器)
- 当线程数量达到最大线程数,且任务队列已满时,会拒绝任务;
- 调用线程池shutdown()方法后,会等待执行完线程池的任务之后,再shutdown()。如果在调用了shutdown()方法和线程池真正shutdown()之间提交任务,会拒绝新任务。
-
任务执行解析
- 如果线程池中线程数量 < 核心线程数,新建一个线程执行任务;
- 如果线程池中线程数量 >= 核心线程数,则将任务放入任务队列
- 如果线程池中线程数量 >= 核心线程数 且 < maxPoolSize,且任务队列满了,则创建新的线程;
- 如果线程池中线程数量 > 核心线程数,当线程空闲时间超过了keepalive时,则会销毁线程;由此可见线程池的队列如果是无界队列,那么设置线程池最大数量是无效的;
- 如果线程池中的任务队列满了,而且线程数达到了maxPoolSize,并且没有空闲的线程可以执行新的任务,这时候再提交任务就会执行拒绝策略
-
rejectedExecutionHandler字段用于配置拒绝策略,常用的拒绝策略如下:
- AbortPolicy 抛出RejectedExecutionException。
- CallerRunsPolicy 直接在execute方法的调用线程中运行被拒绝的任务。
- DiscardOldestPolicy 放弃最旧的未处理请求,然后重试execute。
- DiscardPolicy 下它将丢弃被拒绝的任务。
Spring Boot中异步线程池的配置
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
@Configuration
public class AsyncExecutorConfig {
@Bean
public Executor asyncServiceExecutor() {
log.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
//如果是IO密集型应用,则线程池大小设置为2N+1;
//如果是CPU密集型应用,则线程池大小设置为N+1;
executor.setCorePoolSize(6);
//配置最大线程数
executor.setMaxPoolSize(10);
//配置队列大小
executor.setQueueCapacity(10000);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-service-");
// 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
}
}
异步使用方法
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class AsyncService {
@Async("asyncServiceExecutor")
public Future<String> getAsyncResult1() {
String result = "asyncResultTest1";
try {
Thread.sleep(3000);
log.info("线程名称:{},睡眠:{}秒", Thread.currentThread().getName(), "3");
} catch (InterruptedException e) {
e.printStackTrace();
}
return new AsyncResult<String>(result);
}
@Async("asyncServiceExecutor")
public Future<String> getAsyncResult2() {
String result = "asyncResultTest2";
try {
Thread.sleep(4000);
log.info("线程名称:{},睡眠:{}秒", Thread.currentThread().getName(), "4");
} catch (InterruptedException e) {
e.printStackTrace();
}
return new AsyncResult<String>(result);
}
}
@RestController
@RequestMapping(value = "/test")
public class TestController extends BaseBeanController {
//线程池,上面Config中定义的
@Autowired
public Executor asyncServiceExecutor;
@PostMapping(value = "/test")
public DeferredResult<R> test() throws ExecutionException, InterruptedException {
DeferredResult deferredResult = new DeferredResult(10000L);
CompletableFuture<R> result = CompletableFuture.supplyAsync(() -> {
//耗时方法
}, asyncServiceExecutor);
deferredResult.setResult(result.get());
return deferredResult;
}
}
网友评论