项目中很多任务都可以异步完成,比如消息通知等。 可以借用Spring Async注解,可以很快的实现异步调用。另外为了方便跟踪请求日志,一般会借助MDC在日志中输出traceId,但是跨线程执行的时候的,MDC信息并不会传递,所以需要自定义线程执行器。
启用Async
Spring Boot配置Async
- 添加注解
@EnableAsync
@Slf4j
@EnableAsync
@SpringBootApplication
public class AsyncApplication implements ApplicationRunner {
@Resource
private PersonManager personManager;
public static void main(String[] args) {
SpringApplication.run(AsyncApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("run application");
personManager.sayHello();
}
}
异步接口
- 添加注解
@Async
@Slf4j
@Component
public class PersonManager {
@Async
public void sayHello() {
log.info("Hello World!");
}
}
执行结果
[2019-07-19 21:23:21.823][main][INFO][AsyncApplication:33][][]: run application
[2019-07-19 21:23:21.824][main][DEBUG][DefaultSingletonBeanRegistry:213][][]: Creating shared instance of singleton bean 'applicationTaskExecutor'
[2019-07-19 21:23:21.824][main][DEBUG][ConstructorResolver:777][][]: Autowiring by type from bean name 'applicationTaskExecutor' via factory method to bean named 'taskExecutorBuilder'
[2019-07-19 21:23:21.830][main][INFO][ExecutorConfigurationSupport:171][][]: Initializing ExecutorService 'applicationTaskExecutor'
[2019-07-19 21:23:21.835][task-1][INFO][PersonManager:16][][]: Hello World!
执行日志中可以看到sayHello
函数是在任务执行器applicationTaskExecutor
的线程task-1
执行的,不是main
线程
自定义Async线程池
@Bean
public AsyncTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("Anno-Executor");
executor.setMaxPoolSize(10);
return executor;
}
- 执行结果
[2019-07-19 21:25:17.952][main][INFO][AsyncApplication:33][][]: run application
[2019-07-19 21:25:17.958][Anno-Executor1][INFO][PersonManager:16][][]: Hello World!
自定义MDC可继承的ThreadPoolTaskExecutor
当我们在日志中使用MDC实现调用链路跟踪时(使用traceId),如果异步调用,则会丢失MDC信息。所以建议使用下面的MdcThreadPoolTaskExecutor
public class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
final private boolean useFixedContext;
final private Map<String, String> fixedContext;
/**
* Pool where task threads take MDC from the submitting thread.
*/
public static MdcThreadPoolTaskExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, int queueCapacity) {
return new MdcThreadPoolTaskExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, queueCapacity);
}
private MdcThreadPoolTaskExecutor(Map<String, String> fixedContext, int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, int queueCapacity) {
setCorePoolSize(corePoolSize);
setMaxPoolSize(maximumPoolSize);
setKeepAliveSeconds((int) unit.toSeconds(keepAliveTime));
setQueueCapacity(queueCapacity);
this.fixedContext = fixedContext;
useFixedContext = (fixedContext != null);
}
private Map<String, String> getContextForTask() {
return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
}
/**
* All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
* all delegate to this.
*/
@Override
public void execute(@NonNull Runnable command) {
super.execute(wrap(command, getContextForTask()));
}
@NonNull
@Override
public Future<?> submit(@NonNull Runnable task) {
return super.submit(wrap(task, getContextForTask()));
}
@NonNull
@Override
public <T> Future<T> submit(@NonNull Callable<T> task) {
return super.submit(wrap(task, getContextForTask()));
}
private static <T> Callable<T> wrap(final Callable<T> task, final Map<String, String> context) {
return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
return task.call();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}
private static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
return () -> {
Map<String, String> previous = MDC.getCopyOfContextMap();
if (context == null) {
MDC.clear();
} else {
MDC.setContextMap(context);
}
try {
runnable.run();
} finally {
if (previous == null) {
MDC.clear();
} else {
MDC.setContextMap(previous);
}
}
};
}
}
使用MdcThreadPoolTaskExecutor
@Slf4j
@EnableAsync
@SpringBootApplication
public class AsyncApplication implements ApplicationRunner {
@Resource
private PersonManager personManager;
public static void main(String[] args) {
SpringApplication.run(AsyncApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
MDC.put("traceId", UUID.randomUUID().toString());
log.info("run application");
personManager.sayHello();
}
@Bean
public AsyncTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = MdcThreadPoolTaskExecutor.newWithInheritedMdc(8, 32, 1, TimeUnit.MINUTES, 1000);
executor.setThreadNamePrefix("Anno-Executor");
executor.setMaxPoolSize(10);
return executor;
}
}
- 执行结果
[2019-07-19 21:29:58.567][main][INFO][AsyncApplication:32][][07570316-f690-44c5-adb6-dc69c097323f]: run application
[2019-07-19 21:29:58.575][Anno-Executor1][INFO][PersonManager:16][][07570316-f690-44c5-adb6-dc69c097323f]: Hello World!
可以看到traceId
也传递到线程Anno-Executor1
了
网友评论