美文网首页
线程池 - ThreadPoolExecutor

线程池 - ThreadPoolExecutor

作者: 我要做大牛23333 | 来源:发表于2018-06-26 18:10 被阅读0次

    在Java中一般我们会通过Executors来创建线程池,一般有四种:

    1. newCachedThreadPool
    2. newFixedThreadPool
    3. newSingleThreadExecutor
    4. newScheduledThreadPool

    这四种创建方式其实万变不离其宗,最底层都是使用的ThreadPoolExecutor,我们先来看下ThreadPoolExecutor的构造函数。

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    

    总共有5个参数:

    • corePoolSize 是线程池的初始大小,可理解为desiredSize或者最小线程数;
    • maximumPoolSize 是最大可申请的线程数;
    • keepAliveTime 是当线程数大于corePoolSize时如果现成出于idle空闲状态,那么它将在keepAliveTime时间后被清理;
    • TimeUnit 就是keepAliveTime的时间单位,比如秒,微秒等;
    • BlockingQueue 是一个阻塞队列,如果熟悉生产者消费者模式或者PV操作那么对于它应该不陌生,这里下面会做进一步介绍;
    • ThreadFactory 用于创建线程,一般用默认实现;
    • RejectedExecutionHandler 是在线程池无法创建新线程并且队列达到最大限制时的处理策略,默认是AbortPolicy,由于这里有多种策略,并且不属于重点内容,不多做介绍;

    接下来我们挨个看下Executors里这几种构造函数到底是如何构造线程池的。

    newCachedThreadPool
    缓存线程池,可以理解为弹性线程池,个人认为比较适用于有特殊时段的高并发请求的场景。

    new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    

    可以看到在线程池里默认是没有线程的,但却可以申请无限多的线程(只要系统资源够用),如果现成空闲60秒后会被自动回收,最后那个SynchronousQueue是一个阻塞队列,长度永远为0,peek等操作永远返回null,如果调用take或者put的话会被阻塞,直到有相应的生产者或者消费者,这个特性很关键,因为在判断是否需要创建新线程的时候回用到BlockQueue里的offer方法,如下:

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
    

    首先判断目前的线程数是否小于corePoolSize,如果小于就直接新建一个,否则判断当前线程池是否仍在运行,这里说下ctl.get()这个方法,ctl是一个AtomicInteger类型,ctl.get()返回一个int类型,int类型一共4个字节,最高3位存储线程池的状态,后29位代表线程个数,一共有5种运行状态:

    // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    

    如果仍在运行状态,则对队列执行offer操作,看能否插入成功,

    boolean offer(E e);
    

    成功返回true,失败返回false。好,关键的来了SynchronousQueue的特性是长度永远为0,并且它的offer方法默认是立刻返回,这是什么意思呢?就是说在执行offer的时候如果没有消费者等待则立刻返回false,表示插入失败
    此时会走到最后的if-else执行addWorker,addWorker会判断是否达到maximumPoolSize,没有的话就新建,如果无法再申请资源或达到上限则拒绝操作执行reject(command)
    这是newCachedThreadPool为何可以不停地申请资源创建线程的原因。

    newFixedThreadPool
    顾名思义,就是固定大小的线程池,老规矩,先看构造函数:

    new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
    

    可以看到corePoolSizemaximumPoolSize是一样的,不可伸缩,超时时间可以忽略了,因为不会再新建线程,最后的BlockingQueue用的是LinkedBlockingQueue,同学们注意了,这个队列和前面提到的SynchronousQueue有啥区别呢?这很关键,因为当决定是否新建线程还是放到等待队列里时就是看这个queue还能否插入,我们来看下LinkedBlockingQueue的实现:

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
     }
    

    可以看到默认队列长度是最大值,由于offer方法太复杂所以就不放出来了,但效果是判断是否达到队列最大长度,如果没有那么就把任务先加入队列。
    所以这就是区别,在没有可用线程的时候线程池会不断地把任务放到LinkedBlockingQueue中等待可用线程

    newSingleThreadExecutor
    单线程线程池,如果所执行的任务抛出异常或者终止,该线程池会自动新建一个线程保证服务的可用。好处是可以保证任务是串行的,不用考虑并发同步等复杂的问题。

    new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
    

    配置和上一个类似,只是线程数限制为1。FinalizableDelegatedExecutorService对ExecutorService做了一层封装,这里不详述了。

    newScheduledThreadPool
    用于执行定时任务

    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
    

    可以看到这里使用了DelayedWorkQueue,在提交任务的时候

    public ScheduledFuture<?> schedule(Runnable command,
                                           long delay, TimeUnit unit);
    

    这里使用了DelayedWorkQueue的特性,在取操作的时候会block直到delay到期。由于本次没有详细研究DelayedWorkQueue所以不多做介绍,有兴趣的同学可以自己读下源码。
    在《Effective Java II》和《Java并发编程实践》里都强调了在执行定时任务的时候最好使用线程池,而不要用Timer,因为线程池可以很好的维护线程状态,确保不会异常退出造成混乱。

    总结

    在使用线程池的时候最好使用Executors封装好的方法,如果想要定制化可以参考ThreadPoolExecutor的构造函数,自定义参数,切记选择合适的队列类型。相信在了解了底层实现原理后对线程池的控制会更有自信。

    1_m7BOUXlstifMXvHY5BXHjw.jpeg

    相关文章

      网友评论

          本文标题:线程池 - ThreadPoolExecutor

          本文链接:https://www.haomeiwen.com/subject/kmemyftx.html