import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池的配置
*/
@EnableAsync // 异步调用
@Configuration
@Slf4j
public class AsyncConfig implements AsyncConfigurer {
/**
* 创建一个固定大小的线程池
*/
@Bean(name = "fixedThreadPool")
public ExecutorService fixedThreadPool() {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(corePoolSize);
return fixedThreadPool;
}
@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
return this.getAsyncExecutor();
}
// 核心线程数为服务器的cpu核心数
private static int corePoolSize = 5; //Runtime.getRuntime().availableProcessors();
// 线程池中允许的最大线程数
private static int maxPoolSize = 2 * corePoolSize + 1;
// 工作队列大小
private static int queueCapacity = 5000;
/**
* 如果池中任务数 < corePoolSize (核心线程数),创建新线程立即执行任务
* 如果池中任务数 > corePoolSize,新任务放到缓存队列当中等待执行
* 队列满,线程数量<maxPoolSize,新建线程立即执行任务
* 队列满,线程数量>=maxPoolSize,使用拒绝策略拒绝
*/
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(corePoolSize);// 当前线程数
taskExecutor.setMaxPoolSize(maxPoolSize);// 最大线程数
taskExecutor.setQueueCapacity(queueCapacity);//线程池所使用的缓冲队列
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);//等待任务在关机时完成--表明等待所有线程执行完
taskExecutor.setAwaitTerminationSeconds(60 * 15);// 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
taskExecutor.setThreadNamePrefix("MyAsync-");// 线程名称前缀
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// 拒绝策略:CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
/*
AbortPolicy:直接抛出异常。
CallerRunsPolicy:只用调用者所在线程来运行任务。
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
DiscardPolicy:不处理,丢弃掉。
*/
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
taskExecutor.initialize(); // 初始化
return taskExecutor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
log.info("Exception message - " + throwable.getMessage());
log.info("Method name - " + method.getName());
for (Object param : objects) {
log.info("Parameter value - " + param);
}
}
};
}
}
网友评论