美文网首页
ThreadPoolTaskExecutor小记

ThreadPoolTaskExecutor小记

作者: yellow_han | 来源:发表于2020-03-11 20:51 被阅读0次
    • corePoolSize(核心线程数)

      • 核心线程会一直存在,即使没有任务执行;
      • 当线程数小于核心线程数的时候,即使有空闲线程,也会一直创建线程直到达到核心线程数;
      • 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭。
    • queueCapacity(任务队列容量)

      • 也叫阻塞队列,当核心线程都在运行,此时再有任务进来,会进入任务队列,排队等待线程执行。
    • maxPoolSize(最大线程数)

      • 线程池里允许存在的最大线程数量;
      • 当任务队列已满,且线程数量大于等于核心线程数时,会创建新的线程执行任务;
      • 线程池里允许存在的最大线程数量。当任务队列已满,且线程数量大于等于核心线程数时,会创建新的线程执行任务。
    • keepAliveTime(线程空闲时间)

      • 当线程空闲时间达到keepAliveTime时,线程会退出(关闭),直到线程数等于核心线程数;
      • 如果设置了allowCoreThreadTimeout=true,则线程会退出直到线程数等于零。
      • allowCoreThreadTimeout(允许核心线程超时)
      • ejectedExecutionHandler(任务拒绝处理器)
      • 当线程数量达到最大线程数,且任务队列已满时,会拒绝任务;
      • 调用线程池shutdown()方法后,会等待执行完线程池的任务之后,再shutdown()。如果在调用了shutdown()方法和线程池真正shutdown()之间提交任务,会拒绝新任务。
    • 任务执行解析

      • 如果线程池中线程数量 < 核心线程数,新建一个线程执行任务;
      • 如果线程池中线程数量 >= 核心线程数,则将任务放入任务队列
      • 如果线程池中线程数量 >= 核心线程数 且 < maxPoolSize,且任务队列满了,则创建新的线程;
      • 如果线程池中线程数量 > 核心线程数,当线程空闲时间超过了keepalive时,则会销毁线程;由此可见线程池的队列如果是无界队列,那么设置线程池最大数量是无效的;
      • 如果线程池中的任务队列满了,而且线程数达到了maxPoolSize,并且没有空闲的线程可以执行新的任务,这时候再提交任务就会执行拒绝策略
    • rejectedExecutionHandler字段用于配置拒绝策略,常用的拒绝策略如下:

      • AbortPolicy 抛出RejectedExecutionException。
      • CallerRunsPolicy 直接在execute方法的调用线程中运行被拒绝的任务。
      • DiscardOldestPolicy 放弃最旧的未处理请求,然后重试execute。
      • DiscardPolicy 下它将丢弃被拒绝的任务。

    Spring Boot中异步线程池的配置

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    @Slf4j
    @Configuration
    public class AsyncExecutorConfig {
    
        @Bean
        public Executor asyncServiceExecutor() {
            log.info("start asyncServiceExecutor");
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //配置核心线程数
            //如果是IO密集型应用,则线程池大小设置为2N+1;
            //如果是CPU密集型应用,则线程池大小设置为N+1;
            executor.setCorePoolSize(6);
            //配置最大线程数
            executor.setMaxPoolSize(10);
            //配置队列大小
            executor.setQueueCapacity(10000);
            //配置线程池中的线程的名称前缀
            executor.setThreadNamePrefix("async-service-");
            // 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
            // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            //执行初始化
            executor.initialize();
            return executor;
        }
    }
    

    异步使用方法

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.AsyncResult;
    import org.springframework.stereotype.Component;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    @Component
    @Slf4j
    public class AsyncService {
    
        @Async("asyncServiceExecutor")
        public Future<String> getAsyncResult1() {
            String result = "asyncResultTest1";
            try {
                Thread.sleep(3000);
                log.info("线程名称:{},睡眠:{}秒", Thread.currentThread().getName(), "3");
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new AsyncResult<String>(result);
        }
    
        @Async("asyncServiceExecutor")
        public Future<String> getAsyncResult2() {
            String result = "asyncResultTest2";
            try {
                Thread.sleep(4000);
                log.info("线程名称:{},睡眠:{}秒", Thread.currentThread().getName(), "4");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new AsyncResult<String>(result);
        }
    }
    
    @RestController
    @RequestMapping(value = "/test")
    public class TestController extends BaseBeanController {
    
         //线程池,上面Config中定义的
        @Autowired
        public Executor asyncServiceExecutor;
        @PostMapping(value = "/test")
        public DeferredResult<R> test() throws ExecutionException, InterruptedException {
            DeferredResult deferredResult = new DeferredResult(10000L);
            CompletableFuture<R> result = CompletableFuture.supplyAsync(() -> {
                //耗时方法
            }, asyncServiceExecutor);
            deferredResult.setResult(result.get());
            return deferredResult;
        }
    }
    
    

    相关文章

      网友评论

          本文标题:ThreadPoolTaskExecutor小记

          本文链接:https://www.haomeiwen.com/subject/zynhjhtx.html