美文网首页
ThreadPoolExecutor线程池参数

ThreadPoolExecutor线程池参数

作者: 丶兔小胖 | 来源:发表于2019-08-06 13:41 被阅读0次

    ThreadPoolExecutor线程池参数

    1. 背景
    公司一个业务需要循环多次(超过2000次),后将拿到的数据写入数据库中。
    
    2. 解决方法
    基于业务的特点,想到了两个方法
    1. 对数据库大批量的写进行分批次提交,基于JdbcTemplates,或者Batch提交
    2. 创建多个线程池,使用CountDownLatch计数。
    
    3. 应用场景
    应用场景:线程的创建和销毁是一个耗时的操作。如果在程序中频繁的创建和销毁线程,会对程序的反应速度造成严重的影响。有时候可能导致crash掉。如果在频繁的使用线程就可以使用线程池代替。
    
    5.参数说明
    1. corePoolSize:线程池基本大小 核心线程数

      1.  核心线程 数一定会存活,即使没有任务需要执行
      
      2. tasks(任务数) <coorePooSize 当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,tasks(任务数) > coorePooSize,会放入之后的队列
      
      3. 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
      
      4. 如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
      
    2. queueCapacity(任务队列容量)

      1. ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序
      
      2. LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
      
      3. SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
      
      4. PriorityBlockingQueue:一个具有优先级得无限阻塞队列
      
    3. maxPoolSize 最大线程数

      1. 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
      2. 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
      3. 如果使用了无界的任务队列这个参数就没什么效果
      
    4. keepAliveTime:线程空闲时间

      1. 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
      2. 如果allowCoreThreadTimeout=true,则会直到线程数量=0
      
    5. RejectedExecutionHandler: 拒绝策略

      当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。

      1. ThreadPoolExecutor.AbortPolicy:
         当线程池中的数量等于最大线程数时抛 java.util.concurrent.RejectedExecutionException 异                常,涉及到该异常的任务也不会被执行,线程池默认的拒绝策略就是该策略。
      2. ThreadPoolExecutor.DiscardPolicy():
         当线程池中的数量等于最大线程数时,默默丢弃不能执行的新加任务,不报任何异常。
      3. ThreadPoolExecutor.CallerRunsPolicy():
         当线程池中的数量等于最大线程数时,重试添加当前的任务;它会自动重复调用execute()方法。
      4. ThreadPoolExecutor.DiscardOldestPolicy():
         当线程池中的数量等于最大线程数时,抛弃线程池中工作队列头部的任务(即等待时间最久的任务),并执行当前任务。
      
    5. 线程池工作顺序

    1.当线程池中线程数量小于 corePoolSize 则创建线程,并处理请求。

    2. 当线程池中线程数量大于等于 corePoolSize 时,则把请求放入 workQueue 中,随着线程池中的核心线程们不断执行任务,只要线程池中有空闲的核心线程,线程池就从 workQueue 中取任务并处理。

    3. 当 taskQueue 已存满,放不下新任务时则新建非核心线程入池,并处理请求直到线程数目达到 maximumPoolSize(最大线程数量设置值)。

    4. 如果线程池中线程数大于 maximumPoolSize 则使用 RejectedExecutionHandler 来进行任务拒绝处理。

    6. 如何设置参数

    参考了这篇:ThreadPoolExecutor线程池参数设置技巧

    • 默认值
      • corePoolSize=1
      • queueCapacity=Integer.MAX_VALUE
      • maxPoolSize=Integer.MAX_VALUE
      • keepAliveTime=60s
      • allowCoreThreadTimeout=false
      • rejectedExecutionHandler=AbortPolicy()
    • 如何来设置
      • 需要根据几个值来决定
        • tasks :每秒的任务数,假设为500~1000
        • taskcost:每个任务花费时间,假设为0.1s
        • responsetime:系统允许容忍的最大响应时间,假设为1s
      • 做几个计算
        • corePoolSize = 每秒需要多少个线程处理?
          • threadcount = tasks/(1/taskcost) =taskstaskcout = (500~1000)0.1 = 50~100 个线程。corePoolSize设置应该大于50
          • 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
        • queueCapacity = (coreSizePool/taskcost)*responsetime
          • 计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
          • 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
        • maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
          • 计算可得 maxPoolSize = (1000-80)/10 = 92
          • (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
        • rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
        • keepAliveTime和allowCoreThreadTimeout采用默认通常能满足
    7. 代码
    //参数初始化
    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    //核心线程数量大小
    private static final int corePoolSize = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    //线程池最大容纳线程数
    private static final int maximumPoolSize = CPU_COUNT * 2 + 1;
    //线程空闲后的存活时长
    private static final int keepAliveTime = 30;
     
    //任务过多后,存储任务的一个阻塞队列
    BlockingQueue<Runnable>  workQueue = new SynchronousQueue<>();
     
    //线程的创建工厂
    ThreadFactory threadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);
     
        public Thread newThread(Runnable r) {
            return new Thread(r, "AdvacnedAsyncTask #" + mCount.getAndIncrement());
        }
    };
     
    //线程池任务满载后采取的任务拒绝策略
    RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
     
    //线程池对象,创建线程
    ThreadPoolExecutor mExecute = new ThreadPoolExecutor(
            corePoolSize, //线程池的核心线程数
            maximumPoolSize,//线程池所能容纳的最大线程数
            keepAliveTime,//线程的空闲时间
            TimeUnit.SECONDS,//keepAliveTime对应的单位
            workQueue,//线程池中的任务队列
            threadFactory, //线程工厂
            rejectHandler//当任务无法被执行时的拒绝策略
    );
    

    相关文章

      网友评论

          本文标题:ThreadPoolExecutor线程池参数

          本文链接:https://www.haomeiwen.com/subject/lhbidctx.html