美文网首页
ThreadPoolExecutor用法解析

ThreadPoolExecutor用法解析

作者: 猪的尾巴 | 来源:发表于2017-09-10 21:18 被阅读0次

    大概用Executors直接生成ThreadPoolExecutor的习惯会被认为理所当然,但是在阿里的java开发规范中是禁止的,开发者必须使用ThreadPoolExecutor。其实没有什么好不好,Executors没法满足所有场景罢了。

    先贴一张网上找来的图介绍下线程池运行线程的流程

    image.png

    ThreadPoolExecutor

    司空见惯,ThreadPoolExecutor也有多个构造方法,并且都指向参数最多的一个:

    image.png

    下面贴出这个构造方法的代码:

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    逐一解释

    corePoolSize

    核心线程数,当线程池初始化以后,不会立即创建所有的核心线程(除非主动调用了prestartAllCoreThreads()),而是有一个任务进来,如果线程数还没有达到corePoolSize,那么就选择新建一个线程,如果达到核心线程数,则放入阻塞队列中。


    maximumPoolSize

    因为corePoolSize的设置,溢出任务会被放入阻塞队列中,当阻塞队列满了,同时又有新任务进来,那么线程池会进行扩容,但是总的线程数量不会超过maximumPoolSize。


    keepAliveTime与unit

    keepAliveTime代表当线程空闲以后(前提是之前发生过阻塞队列满,并且增加了线程数量的情况),还能继续存活的时间,unit是时间单位,当时间超过keepAliveTime并且线程处于空闲状态,则被回收,回收的数量为:当前线程数 - corePoolSize。


    workQueue

    当前已有workQueue个线程在执行任务无法空闲,此时如果新加入任务,则放入workQueue(阻塞队列中),下面介绍几种阻塞队列。

    队列 说明
    ArrayBlockingQueue 基于数组结构的有界阻塞队列,按FIFO排序任务
    LinkedBlockingQuene 基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene
    SynchronousQuene 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene
    priorityBlockingQuene priorityBlockingQuene:具有优先级的无界阻塞队列

    threadFactory

    线程工厂,主要对线程进行一些装饰,如我需要对我的线程有特殊的命名:

      static class TestThreadFactory implements ThreadFactory {
        private AtomicInteger threadNum = new AtomicInteger(0);
        public Thread newThread(Runnable r) {
          Thread t = new Thread(r);
          t.setName("test-thread-pool-" + threadNum.incrementAndGet());
          return t;
        }
      }   
    

    handler

    当线程池无法承受任务数量(当前线程数量 = maximumPoolSize),再有任务进来,将会采取拒绝策略,java提供了四种对超额任务的拒绝处理

    队列 说明
    ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常
    ThreadPoolExecutor.DiscardPolicy 也是丢弃任务,但是不抛出异常
    ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务

    一个简单的例子

    public class test {
    
      public static void main(String[] args){
        TestThreadFactory testThreadFactory = new TestThreadFactory();
    
        // 默认策略,直接抛出异常
        RejectedExecutionHandler abortPolicyHandler = new ThreadPoolExecutor.AbortPolicy();
    
        // 丢弃任务,不抛出异常
        RejectedExecutionHandler discardPolicyHandler = new ThreadPoolExecutor.DiscardPolicy();
    
        // 丢弃队列最前面的任务,重新尝试执行任务
        RejectedExecutionHandler discardOldestPolicyHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
    
        // 由调用线程处处理该任务-移交给主线程执行(谨慎使用)
        RejectedExecutionHandler CallerRunsPolicyHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    
    
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,
                                                                       10,
                                                                       1,
                                                                       TimeUnit.SECONDS,
                                                                       new ArrayBlockingQueue<>(
                                                                           1),
                                                                       testThreadFactory,
                                                                       CallerRunsPolicyHandler);
        System.out.println(threadPoolExecutor.getPoolSize());
        for (int i = 0; i <= 15; i++){
          threadPoolExecutor.execute(() -> {
            System.out.println("hello Jules, I am [" + Thread.currentThread().getName() + "]");
            try {
              Thread.sleep(2000);
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          });
        }
        try {
          Thread.sleep(4000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        System.out.println(threadPoolExecutor.getPoolSize());
    
      }
    
      /**
       * 利用TestThreadFactory修改线程名称
       */
      static class TestThreadFactory implements ThreadFactory {
        private AtomicInteger threadNum = new AtomicInteger(0);
        public Thread newThread(Runnable r) {
          Thread t = new Thread(r);
          t.setName("test-thread-pool-" + threadNum.incrementAndGet());
          return t;
        }
      }
    }
    

    结果:

    image.png

    采用了CallerRunsPolicyHandler策略,所以溢出线程由调用方(main)处理。


    欢迎关注公众号交流,定期分享源码心得


    image.png

    相关文章

      网友评论

          本文标题:ThreadPoolExecutor用法解析

          本文链接:https://www.haomeiwen.com/subject/tpytsxtx.html