美文网首页
ThreadPoolTaskExecutor小记

ThreadPoolTaskExecutor小记

作者: yellow_han | 来源:发表于2020-03-11 20:51 被阅读0次
  • corePoolSize(核心线程数)

    • 核心线程会一直存在,即使没有任务执行;
    • 当线程数小于核心线程数的时候,即使有空闲线程,也会一直创建线程直到达到核心线程数;
    • 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭。
  • queueCapacity(任务队列容量)

    • 也叫阻塞队列,当核心线程都在运行,此时再有任务进来,会进入任务队列,排队等待线程执行。
  • maxPoolSize(最大线程数)

    • 线程池里允许存在的最大线程数量;
    • 当任务队列已满,且线程数量大于等于核心线程数时,会创建新的线程执行任务;
    • 线程池里允许存在的最大线程数量。当任务队列已满,且线程数量大于等于核心线程数时,会创建新的线程执行任务。
  • keepAliveTime(线程空闲时间)

    • 当线程空闲时间达到keepAliveTime时,线程会退出(关闭),直到线程数等于核心线程数;
    • 如果设置了allowCoreThreadTimeout=true,则线程会退出直到线程数等于零。
    • allowCoreThreadTimeout(允许核心线程超时)
    • ejectedExecutionHandler(任务拒绝处理器)
    • 当线程数量达到最大线程数,且任务队列已满时,会拒绝任务;
    • 调用线程池shutdown()方法后,会等待执行完线程池的任务之后,再shutdown()。如果在调用了shutdown()方法和线程池真正shutdown()之间提交任务,会拒绝新任务。
  • 任务执行解析

    • 如果线程池中线程数量 < 核心线程数,新建一个线程执行任务;
    • 如果线程池中线程数量 >= 核心线程数,则将任务放入任务队列
    • 如果线程池中线程数量 >= 核心线程数 且 < maxPoolSize,且任务队列满了,则创建新的线程;
    • 如果线程池中线程数量 > 核心线程数,当线程空闲时间超过了keepalive时,则会销毁线程;由此可见线程池的队列如果是无界队列,那么设置线程池最大数量是无效的;
    • 如果线程池中的任务队列满了,而且线程数达到了maxPoolSize,并且没有空闲的线程可以执行新的任务,这时候再提交任务就会执行拒绝策略
  • rejectedExecutionHandler字段用于配置拒绝策略,常用的拒绝策略如下:

    • AbortPolicy 抛出RejectedExecutionException。
    • CallerRunsPolicy 直接在execute方法的调用线程中运行被拒绝的任务。
    • DiscardOldestPolicy 放弃最旧的未处理请求,然后重试execute。
    • DiscardPolicy 下它将丢弃被拒绝的任务。

Spring Boot中异步线程池的配置

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Slf4j
@Configuration
public class AsyncExecutorConfig {

    @Bean
    public Executor asyncServiceExecutor() {
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        //如果是IO密集型应用,则线程池大小设置为2N+1;
        //如果是CPU密集型应用,则线程池大小设置为N+1;
        executor.setCorePoolSize(6);
        //配置最大线程数
        executor.setMaxPoolSize(10);
        //配置队列大小
        executor.setQueueCapacity(10000);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-service-");
        // 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    }
}

异步使用方法

import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class AsyncService {

    @Async("asyncServiceExecutor")
    public Future<String> getAsyncResult1() {
        String result = "asyncResultTest1";
        try {
            Thread.sleep(3000);
            log.info("线程名称:{},睡眠:{}秒", Thread.currentThread().getName(), "3");

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new AsyncResult<String>(result);
    }

    @Async("asyncServiceExecutor")
    public Future<String> getAsyncResult2() {
        String result = "asyncResultTest2";
        try {
            Thread.sleep(4000);
            log.info("线程名称:{},睡眠:{}秒", Thread.currentThread().getName(), "4");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new AsyncResult<String>(result);
    }
}
@RestController
@RequestMapping(value = "/test")
public class TestController extends BaseBeanController {

     //线程池,上面Config中定义的
    @Autowired
    public Executor asyncServiceExecutor;
    @PostMapping(value = "/test")
    public DeferredResult<R> test() throws ExecutionException, InterruptedException {
        DeferredResult deferredResult = new DeferredResult(10000L);
        CompletableFuture<R> result = CompletableFuture.supplyAsync(() -> {
            //耗时方法
        }, asyncServiceExecutor);
        deferredResult.setResult(result.get());
        return deferredResult;
    }
}

相关文章

网友评论

      本文标题:ThreadPoolTaskExecutor小记

      本文链接:https://www.haomeiwen.com/subject/zynhjhtx.html