美文网首页
Spring Async的使用&MDC继承

Spring Async的使用&MDC继承

作者: 十毛tenmao | 来源:发表于2019-07-19 23:10 被阅读0次

    项目中很多任务都可以异步完成,比如消息通知等。 可以借用Spring Async注解,可以很快的实现异步调用。另外为了方便跟踪请求日志,一般会借助MDC在日志中输出traceId,但是跨线程执行的时候的,MDC信息并不会传递,所以需要自定义线程执行器。

    启用Async


    Spring Boot配置Async

    • 添加注解@EnableAsync
    @Slf4j
    @EnableAsync
    @SpringBootApplication
    public class AsyncApplication implements ApplicationRunner {
        @Resource
        private PersonManager personManager;
    
        public static void main(String[] args) {
            SpringApplication.run(AsyncApplication.class, args);
        }
    
        @Override
        public void run(ApplicationArguments args) throws Exception {
            log.info("run application");
            personManager.sayHello();
        }
    }
    

    异步接口

    • 添加注解@Async
    @Slf4j
    @Component
    public class PersonManager {
        @Async
        public void sayHello() {
            log.info("Hello World!");
        }
    }
    

    执行结果

    [2019-07-19 21:23:21.823][main][INFO][AsyncApplication:33][][]: run application
    [2019-07-19 21:23:21.824][main][DEBUG][DefaultSingletonBeanRegistry:213][][]: Creating shared instance of singleton bean 'applicationTaskExecutor'
    [2019-07-19 21:23:21.824][main][DEBUG][ConstructorResolver:777][][]: Autowiring by type from bean name 'applicationTaskExecutor' via factory method to bean named 'taskExecutorBuilder'
    [2019-07-19 21:23:21.830][main][INFO][ExecutorConfigurationSupport:171][][]: Initializing ExecutorService 'applicationTaskExecutor'
    [2019-07-19 21:23:21.835][task-1][INFO][PersonManager:16][][]: Hello World!
    

    执行日志中可以看到sayHello函数是在任务执行器applicationTaskExecutor的线程task-1执行的,不是main线程

    自定义Async线程池

    @Bean
    public AsyncTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setThreadNamePrefix("Anno-Executor");
        executor.setMaxPoolSize(10);
    
        return executor;
    }
    
    • 执行结果
    [2019-07-19 21:25:17.952][main][INFO][AsyncApplication:33][][]: run application
    [2019-07-19 21:25:17.958][Anno-Executor1][INFO][PersonManager:16][][]: Hello World!
    

    自定义MDC可继承的ThreadPoolTaskExecutor


    当我们在日志中使用MDC实现调用链路跟踪时(使用traceId),如果异步调用,则会丢失MDC信息。所以建议使用下面的MdcThreadPoolTaskExecutor

    public class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    
        final private boolean useFixedContext;
        final private Map<String, String> fixedContext;
    
        /**
         * Pool where task threads take MDC from the submitting thread.
         */
        public static MdcThreadPoolTaskExecutor newWithInheritedMdc(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                                                                    TimeUnit unit, int queueCapacity) {
            return new MdcThreadPoolTaskExecutor(null, corePoolSize, maximumPoolSize, keepAliveTime, unit, queueCapacity);
        }
    
        private MdcThreadPoolTaskExecutor(Map<String, String> fixedContext, int corePoolSize, int maximumPoolSize,
                                          long keepAliveTime, TimeUnit unit, int queueCapacity) {
            setCorePoolSize(corePoolSize);
            setMaxPoolSize(maximumPoolSize);
            setKeepAliveSeconds((int) unit.toSeconds(keepAliveTime));
            setQueueCapacity(queueCapacity);
            this.fixedContext = fixedContext;
            useFixedContext = (fixedContext != null);
        }
    
        private Map<String, String> getContextForTask() {
            return useFixedContext ? fixedContext : MDC.getCopyOfContextMap();
        }
    
        /**
         * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
         * all delegate to this.
         */
        @Override
        public void execute(@NonNull Runnable command) {
            super.execute(wrap(command, getContextForTask()));
        }
    
        @NonNull
        @Override
        public Future<?> submit(@NonNull Runnable task) {
            return super.submit(wrap(task, getContextForTask()));
        }
    
        @NonNull
        @Override
        public <T> Future<T> submit(@NonNull Callable<T> task) {
            return super.submit(wrap(task, getContextForTask()));
        }
    
        private static <T> Callable<T> wrap(final Callable<T> task, final Map<String, String> context) {
            return () -> {
                Map<String, String> previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    return task.call();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            };
        }
    
        private static Runnable wrap(final Runnable runnable, final Map<String, String> context) {
            return () -> {
                Map<String, String> previous = MDC.getCopyOfContextMap();
                if (context == null) {
                    MDC.clear();
                } else {
                    MDC.setContextMap(context);
                }
                try {
                    runnable.run();
                } finally {
                    if (previous == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(previous);
                    }
                }
            };
        }
    }
    

    使用MdcThreadPoolTaskExecutor

    @Slf4j
    @EnableAsync
    @SpringBootApplication
    public class AsyncApplication implements ApplicationRunner {
        @Resource
        private PersonManager personManager;
    
        public static void main(String[] args) {
            SpringApplication.run(AsyncApplication.class, args);
        }
    
        @Override
        public void run(ApplicationArguments args) throws Exception {
            MDC.put("traceId", UUID.randomUUID().toString());
            log.info("run application");
            personManager.sayHello();
        }
    
        @Bean
        public AsyncTaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor executor = MdcThreadPoolTaskExecutor.newWithInheritedMdc(8, 32, 1, TimeUnit.MINUTES, 1000);
            executor.setThreadNamePrefix("Anno-Executor");
            executor.setMaxPoolSize(10);
    
            return executor;
        }
    }
    
    • 执行结果
    [2019-07-19 21:29:58.567][main][INFO][AsyncApplication:32][][07570316-f690-44c5-adb6-dc69c097323f]: run application
    [2019-07-19 21:29:58.575][Anno-Executor1][INFO][PersonManager:16][][07570316-f690-44c5-adb6-dc69c097323f]: Hello World!
    

    可以看到traceId也传递到线程Anno-Executor1

    参考

    相关文章

      网友评论

          本文标题:Spring Async的使用&MDC继承

          本文链接:https://www.haomeiwen.com/subject/redelctx.html