美文网首页Java并发
Java并发系列:线程池ThreadPoolExecutor参数

Java并发系列:线程池ThreadPoolExecutor参数

作者: h2coder | 来源:发表于2020-09-01 19:04 被阅读0次

    上篇回顾

    上篇我们讲了,线程池的创建、提交任务到线程池、关闭线程池。基本使用我们已经没有多大问题了。上节我们创建线程池时,都是使用Executors这个工厂类的静态方法创建线程池,其实里面内置的参数并不一定适合我们的场景,这时候就需要自定义参数了,一起来学习一下吧~

    • 上篇我们使用的Executors创建线程池
    //创建一个线程池,容量大小为Integer.MAX_VALUE,所以是无限大
    ExecutorService pool = Executors.newCachedThreadPool();
    
    //创建容量为1的线程池,它的执行是串行的
    ExecutorService pool = Executors.newSingleThreadExecutor();
    
    //创建固定容量大小的线程池
    ExecutorService pool = Executors.newFixedThreadPool(10);
    
    //周期性任务线程池
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
    

    线程池创建参数分析

    在上面,我们使用Executors类,无论是newCachedThreadPool(),还是newSingleThreadExecutor()等等,都是创建了ThreadPoolExecutor的实例。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    

    我们来看一下ThreadPoolExecutor类的构造方法。

    public class ThreadPoolExecutor extends AbstractExecutorService {
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    
        //...
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
        
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
        
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }
        
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
        //...
    }
    

    我们看到,ThreadPoolExecutor线程池的前3个构造方法缺省参数的,都是调用第四个构造方法。接下来我们来分析这几个构造参数的含义~

    1. corePoolSize,核心线程数,这个数量,是线程池始终保持的线程数量,即使没有任务执行,也不会终止。默认线程池创建后,在没有任务来之前,是没有线程存在的,除非调用了prestartAllCoreThreads()或prestartCoreThread()这2个方法,这2个方法是用来预先创建线程的,那么就会根据这个核心线程数进行预创建线程,并放到线程池中。

    2. maximumPoolSize,线程池最大线程数,他表示这个线程池最多能创建多少个线程。

    3. keepAliveTime,空闲存活时间。即线程在没有执行任务时的存活时间,默认情况下,这个存活时间,只在线程池的线程数大于corePoolSize核心线程数时才生效,例如当前线程数5是大于核心线程数3的,那么多出的那2个线程,在空闲(没有执行任务)时,如果空闲时间大于keepAliveTime,就会被终止,直到线程池的线程数不超过corePoolSize核心线程数。但如果调用了allowCoreThreadTimeOut(true)允许核心线程池超时,即使线程池中的线程数不大于corePoolSize核心线程数,keepAliveTime空闲存活时间也会生效,直到线程池的线程数为0。

    4. unit,keepAliveTime空闲存活时间的时间单位。

    //天
    TimeUnit.DAYS;
    //小时
    TimeUnit.HOURS;
    //分钟
    TimeUnit.MINUTES;
    //秒
    TimeUnit.SECONDS;
    //毫秒
    TimeUnit.MILLISECONDS;
    //微秒
    TimeUnit.MICROSECONDS;
    //纳秒
    TimeUnit.NANOSECONDS;
    
    1. workQueue,BlockingQueue类型的阻塞队列接口,用于存储线程池中待执行的任务。一般为LinkedBlockingQueue或SynchronousQueue。

    2. threadFactory,线程工厂,创建线程对象的工厂类。

    3. handler,RejectedExecutionHandler接口,是当线程池拒绝加入任务时使用的任务拒绝处理器。拒绝处理器有以下4种。

    //丢弃任务,并抛出RejectedExecutionException异常。 
    ThreadPoolExecutor.AbortPolicy
    //忽略,什么都不发生,即抛弃任务就不管了
    ThreadPoolExecutor.DiscardPolicy
    //从队列中踢出最先进入队列(最后一个执行)的任务,然后尝试执行任务
    ThreadPoolExecutor.DiscardOldestPolicy
    //使用线程处理该任务
    ThreadPoolExecutor.CallerRunsPolicy
    

    ThreadPoolExecutor的7个参数理解

    ThreadPoolExecutor的构造方法既有7个,怎么理解呢?

    • 举个栗子,有3位安卓工程师,核心员工,需要做一个项目,那么数量3就是corePoolSize-核心线程数。但是工期实在太赶,只有2个星期,3位工程师实在做不完,项目经理就委派HR大佬threadFactory招来了2位做外包的小伙伴,3+2=5,那么5就是maximumPoolSize-最大线程数。2个星期过后,项目顺利做完了,2位小伙伴就开始空闲了(基本没什么活干了),那么keepAliveTime-空闲时间和unit空闲时间单位就是他们的空闲时间,一到空闲时间,项目经理就将2位小伙伴“炒掉了”(请人还是要钱嘛)。过了没多久,App的第二版需求就出来了,一堆的需求蜂拥而至,项目经理要分辨哪些是紧急需求,哪些是不紧急需求,将需求整理放到一个workQueue任务队列中排队,3位小伙伴就开始工作啦~,临近第二期尾期了,突然产品经理提出,我要加需求!,这时候就需要handler任务拒绝处理器来决定了,是丢弃任务下期再做呢,还是3位小伙伴加加班搞定。

    AbstractExecutorService、ExecutorService和Executor

    //ThreadPoolExecutor继承于抽象类AbstractExecutorService
    public class ThreadPoolExecutor extends AbstractExecutorService {
        //...
        
        //实现execute()执行任务方法
        public void execute(Runnable command) {
            //...
        }
        
        //...
    }
    
    //抽象类AbstractExecutorService实现了ExecutorService接口
    public abstract class AbstractExecutorService implements ExecutorService {
        //...
        
        //对submit()系列方法的实现
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
        
        //对invokeAny()的实现
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
              //...
        }
        
        //对invokeAll()的实现
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
                //...
            }
        
        //...
    }
    
    //ExecutorService接口继承了Executor接口
    public interface ExecutorService extends Executor {
        //...
    
        //关闭线程池
        void shutdown();
        
        //提交有返回值的任务到线程池,submit有多个重载方法,这里不演示了
        <T> Future<T> submit(Callable<T> task);
        
        //执行任务集合中的任意一个有返回值的任务
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
         
        //执行任务集合中的所有任务   
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
        
        //...
    }
    
    //Executor接口中只定义了一个execute()方法,执行Runnable任务
    public interface Executor {
        void execute(Runnable command);
    }
    
    • 先从最底开始,Executor接口定义了execute()方法,一个指定方法,证明Executor接口能执行一个无返回值的任务。

    • ExecutorService接口,继承Executor,定义了例如shutdown(),submit(),invokeAny(),invokeAll()等方法。

    • AbstractExecutorService抽象类,实现了ExecutorService接口,对submit(),invokeAny(),invokeAll()等在ExecutorService接口中申明的方法提供公共实现。

    • 最后我们使用的ThreadPoolExecutor线程池类,继承于AbstractExecutorService抽象类,实现execute()等还未实现的方法,以及线程池的具体实现。

    如何合理设置线程池参数

    最简化公式

    • CPU 密集型应用:线程池大小设置为 N + 1

    • IO 密集型应用:线程池大小设置为 2N

    公式的意义在于避免陷入极端情况。其中,计算密集型任务假设“等待时间/计算时间”等于0,IO密集型任务假设“等待时间/计算时间”等于1。

    为什么要有+1呢?

    这是因为,就算是计算密集型任务,也可能存在缺页等问题(需要了解虚拟内存和物理内存的分配),产生“隐式”的IO。多一个额外的线程能确保CPU时钟周期不会被浪费,又不至于增加太多线程调度成本。

    严格公式

    假设每个线程的“等待时间 / 计算时间”大小相等,显然,“计算时间 / (计算时间 + 等待时间)”也相等。对1个线程而言,只有计算时间占用了逻辑CPU,假设这个线程一直运行在同1个逻辑CPU上,显然,该逻辑CPU的CPU利用率即等于“计算时间 / (计算时间 + 等待时间)”。

    对于多个线程的情况是一样的,则有公式:

    逻辑CPU数 * CPU利用率 / 线程数 = 计算时间 / (计算时间 + 等待时间)
    

    也等于:

    线程数 = (1 + 等待时间/计算时间) * 逻辑CPU数 * CPU利用率
    
    • “1 + 等待时间/计算时间” 只与任务本身有关。

    • 逻辑CPU数可通过cat /proc/cpuinfo | grep -c processor得到。

    • 标准的CPU利用率要通过实际监控得到,但在估算线程池大小时,应看做“期望得到的CPU利用率”,即可分配给该任务的CPU比例。如果只打算分配一半CPU给任务的话,就是0.5。

    如果估算得到的线程数比较多,那么还要适当提高可分配的CPU比例,因为线程切换的成本随线程数增加而增加。如果竞争较激烈,则可以适当降低可分配的CPU比例,因为竞争通常也会导致线程阻塞,使CPU空闲。

    相关文章

      网友评论

        本文标题:Java并发系列:线程池ThreadPoolExecutor参数

        本文链接:https://www.haomeiwen.com/subject/yppbjktx.html