美文网首页
SpringBoot线程池ThreadPoolExecutor

SpringBoot线程池ThreadPoolExecutor

作者: jeffrey_hjf | 来源:发表于2019-08-26 19:51 被阅读0次
    SpringBoot线程池ThreadPoolExecutor

    SpringBoot框架@Async注解文章:SpringBoot异步调用@Async
    SpringBoot线程池ThreadPoolTaskExecutor文章:SpringBoot线程池ThreadPoolTaskExecutor

    ThreadPoolExecutor是JDK中的JUC中的线程池技术

    SpringBoot线程池ThreadPoolTaskExecutor代码实现

    service层
    1. 创建一个service层的接口AsyncService,如下:
    public interface AsyncService {
        /**
         * 执行异步任务
         * */
        void executeAsync1() throws InterruptedException;
    
        /**
         * 执行异步任务
         * */
        void executeAsync2() throws InterruptedException;
    }
    
    1. 对应的AsyncServiceImpl,实现如下:
    /**
     * 异步线程service
     * @author jeffrey_hjf
     */
    @Service
    @Log
    public class AsyncServiceImpl implements AsyncService {
    
        @Override
        @Async("asyncExecutor")
        public void executeAsync1() throws InterruptedException {
            System.out.println("MsgServer send A thread name->" + Thread.currentThread().getName());
            Long startTime = System.currentTimeMillis();
            TimeUnit.SECONDS.sleep(2);
    
            Long endTime = System.currentTimeMillis();
            System.out.println("MsgServer send A 耗时:" + (endTime - startTime));
        }
    
        @Override
        @Async("asyncExecutor")
        public void executeAsync2() throws InterruptedException {
            System.out.println("MsgServer send B thread name->" + Thread.currentThread().getName());
            Long startTime = System.currentTimeMillis();
            TimeUnit.SECONDS.sleep(2);
            Long endTime = System.currentTimeMillis();
            System.out.println("MsgServer send B耗时:" + (endTime - startTime));
        }
    }
    
    线程池配置

    创建一个配置类ThreadPoolExecutorConfig,用来定义如何创建一个ThreadPoolExecutor,要使用@Configuration和@EnableAsync这两个注解,表示这是个配置类,并且是线程池的配置类,如下所示:

    /**
     * @author jeffrey_hjf
     */
    @Configuration
    public class ThreadPoolExecutorConfig {
        /**
         * 获得Java虚拟机可用的处理器个数 + 1
         */
        private static final int THREADS = Runtime.getRuntime().availableProcessors() + 1;
    
        @Value("${async.executor.thread.core_pool_size}")
        private int corePoolSize = THREADS;
        @Value("${async.executor.thread.max_pool_size}")
        private int maxPoolSize = 2 * THREADS;
        @Value("${async.executor.thread.queue_capacity}")
        private int queueCapacity = 1024;
        @Value("${async.executor.thread.name.prefix}")
        private String namePrefix = "async-service-";
    
        final ThreadFactory threadFactory = new ThreadFactoryBuilder()
                // -%d不要少
                .setNameFormat(namePrefix + "%d")
                .setDaemon(true)
                .build();
    
        /**
         *
         * @return
         */
        @Bean("asyncExecutor")
        public Executor asyncExecutor() {
            return new ThreadPoolExecutor(corePoolSize, maxPoolSize,
                    5, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(queueCapacity),
                    threadFactory, (r, executor) -> {
                    //打印日志,添加监控等
                    System.out.println("task is rejected!");
            });
        }
    }
    
    controller层

    创建一个controller为Hello,里面定义一个http接口,做的事情是调用Service层的服务,如下:

    /**
     * @ClassName UserController
     * @Author jeffrey_hjf
     * @Description User
     **/
    
    @RestController
    @RequestMapping("/api")
    @Log
    public class UserController {
    
        @Autowired
        private AsyncService asyncService;
    
        /**
         * ThreadPoolExecutor线程池
         * @return
         */
        @GetMapping("/executeThreadPoolExecutor")
        public String executeThreadPoolExecutor() throws Exception {
            System.out.println("主线程 name -->" + Thread.currentThread().getName());
            asyncService.executeAsync1();
            asyncService.executeAsync2();
            return "Hello World";
        }
    }
    
    执行效果

    控制台看见日志如下:

    主线程 name -->http-nio-9090-exec-1
    MsgServer send A thread name->async-service-0
    MsgServer send B thread name->async-service-1
    MsgServer send A 耗时:2000
    MsgServer send B耗时:2000
    

    如上日志所示,我们可以看到controller的执行线程是”nio-8080-exec-1”,这是tomcat的执行线程,而service层的日志显示线程名为“async-service-0”,显然已经在我们配置的线程池中执行了,并且每次请求中,controller的起始和结束日志都是连续打印的,表明每次请求都快速响应了,而耗时的操作都留给线程池中的线程去异步执行;

    SpringBoot线程池扩展ThreadPoolExecutor代码实现

    虽然我们已经用上了线程池,但是还不清楚线程池当时的情况,有多少线程在执行,多少在队列中等待呢?这里我创建了一个ThreadPoolTaskExecutor的子类,改写的方法:beforeExecute、afterExecute、terminated。这些方法可以添加日志、计时、监控或统计信息收集等功能。

    扩展ThreadPoolExecutor类
    public class ThreadPoolExecutorExtend extends ThreadPoolExecutor {
        private final ThreadLocal startTime = new ThreadLocal();
        private final AtomicLong numTasks = new AtomicLong();
        private final AtomicLong totalTime = new AtomicLong();
    
        public ThreadPoolExecutorExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        public ThreadPoolExecutorExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }
    
        public ThreadPoolExecutorExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }
    
        public ThreadPoolExecutorExtend(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            try{
                long endTime = System.currentTimeMillis();
                long useTime = endTime - (long)startTime.get();
                numTasks.incrementAndGet();
                totalTime.addAndGet(useTime);
                System.out.println("afterExecute " + r);
            }finally{
                super.afterExecute(r, t);
            }
        }
    
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            System.out.println("beforeExecute " + r);
            startTime.set(System.currentTimeMillis());
        }
    
        @Override
        protected void terminated() {
            try{
                System.out.println("terminated avg time " + totalTime.get()  + " " + numTasks.get());
            }finally{
                super.terminated();
            }
        }
    }
    
    修改ThreadPoolExecutorConfig配置类

    修改ThreadPoolExecutorConfig.java的asyncExecutorExtend方法,将new ThreadPoolExecutor改为new ThreadPoolExecutorExtend,如下所示:

     @Bean("asyncExecutorExtend")
        public Executor asyncExecutorExtend() {
            return new ThreadPoolExecutorExtend(corePoolSize, maxPoolSize,
                    5, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(queueCapacity),
                    threadFactory, (r, executor) -> {
                //打印日志,添加监控等
                System.out.println("task is rejected!");
            });
        }
    
    执行效果

    日志如下:

    主线程 name -->http-nio-9090-exec-1
    beforeExecute java.util.concurrent.FutureTask@372c5560
    MsgServer send A thread name->async-service-0
    MsgServer send A 耗时:2000
    afterExecute java.util.concurrent.FutureTask@372c5560
    

    SpringBoot框架@Async注解文章:SpringBoot异步调用@Async
    SpringBoot线程池ThreadPoolTaskExecutor文章:SpringBoot线程池ThreadPoolTaskExecutor

    相关文章

      网友评论

          本文标题:SpringBoot线程池ThreadPoolExecutor

          本文链接:https://www.haomeiwen.com/subject/qhodectx.html