美文网首页
java多线程之ThreadPoolExecutor

java多线程之ThreadPoolExecutor

作者: eliteTyc | 来源:发表于2020-03-09 14:13 被阅读0次

《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量;另一方面线程的细节管理交给线程池处理,优化了资源的开销。而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool()、newSingleThreadExecutor()、newCachedThreadPool()等创建线程池的方法,但都有其局限性,不够灵活;另外由于前面几种方法内部也是通过ThreadPoolExecutor方式实现,使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

参数讲解
corePoolSize:核心线程数,如果他们创建了,它们会一直存在于线程池中,直到线程池关闭

maximunPoolSize:线程池可创建的最大线程数(核心线程与辅线程总和),这个值必须大于corePoolSize的值,否则会抛IllegalArgumentException

keepAliveTime:辅线程的空闲等待时间,一个辅线程等待多少时间还没有任务给它处理,它就被销毁

unit:keepAliveTime的单位(天,小时,分钟,秒,毫秒等等)

workQueue:任务等待队列,如果需要处理的任务大于线程池中的线程数,未处理的任务会暂存在任务队列(ps:不一定会保存任务到队列,需要根据当前任务队列的类型来判断,后面详细讲解)

threadFactory:线程创建工厂,一般使用默认(Executors.defaultThreadFactory())即可,可以自定义

handler:拒接策略,如果线程池中线程都在处理任务,任务队列中保存的任务也达到最大值,不能再接收任务了,如何拒绝

重要参数详解

拒绝策略:RejectedExecutionHandler,配置当线程池超载时的反应,(线程池已满且待处理任务大于任务等待队列时做出的反应)

  • ThreadPoolExecutor.AbortPolicy():直接抛出异常

  • ThreadPoolExecutor.CallerRunsPolicy():将超出任务队列的任务放在调用者线程执行

  • ThreadPoolExecutor.DiscardOledestPolicy():丢弃任务队列中最老的那个任务,再次提交无法加入的任务

  • ThreadPoolExecutor.DiscardPolicy():丢弃无法加入的任务

  • 自定义策略:实现RejectedExecutionHandler接口

    class MyExecutionHandler implements RejectedExecutionHandler{
    
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("自定义拒绝策略:"+r.toString());
        }
    }
    

等待队列:workQueue可以分为 直接提交队列有界任务队列无界任务队列优先任务队

先新建一个Runnable任务,方便后面测试

class Printer implements Runnable{
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+"正在执行。");
    }
}
  • 直接提交队列:可以使用SynchronousQueue来创建,即任务队列的容量为0,如果当前线程池线程都在处理任务,在来一个任务,就无法处理了,也不会保存在任务队列

    • 新建一个核心线程为1,最大线程数为2,空闲线程等待时间设置为30秒,等待队列为 直接提交队列,默认工厂,拒绝策略为直接抛出异常,执行10个任务

      ThreadPoolExecutor threadPoolExecutor = null;
              try {
                  threadPoolExecutor = new ThreadPoolExecutor(1, 2, 30,
                          TimeUnit.SECONDS,
      //                    设置任务队列为- 直接提交队列
                          new SynchronousQueue<>(),
                          Executors.defaultThreadFactory(),
                          new ThreadPoolExecutor.AbortPolicy()
                  );
                  Printer printer = new Printer();
                  for (int i = 0; i < 10; i++) {
                      threadPoolExecutor.execute(printer);
                  }
              } finally {
                  //        关闭线程池
                  threadPoolExecutor.shutdown();
      
              }
      
    • 执行结果为:

      pool-1-thread-1正在执行。
      pool-1-thread-2正在执行。
      Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task threads.Printer@5e481248 rejected from java.util.concurrent.ThreadPoolExecutor@66d3c617[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2]
          at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
          at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
          at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
          at threads.ThreadPoolTest.main(ThreadPoolTest.java:29)
      
  • 有界任务队列:使用ArrayBlockingQueue, 如果任务超过corePoolSize,就继续创建直到maximunPoolSize,如果超过maximunPoolSize,就继续存放到这个有界任务队列,队列只能存指定数量的任务(例如10个,如果还有超过10个的任务待处理,会执行拒绝策略)

    • 新建一个核心线程为1,最大线程数为2,空闲线程等待时间设置为30秒,等待队列为 有界任务队列,默认工厂,拒绝策略为直接抛出异常,执行30个任务

       ThreadPoolExecutor threadPoolExecutor = null;
              try {
                  threadPoolExecutor = new ThreadPoolExecutor(1, 2, 30,
                          TimeUnit.SECONDS,
      //                    设置任务队列为- 有界提交队列
                          new ArrayBlockingQueue<>(10),
                          Executors.defaultThreadFactory(),
                          new ThreadPoolExecutor.AbortPolicy()
                  );
                  Printer printer = new Printer();
                  for (int i = 0; i < 30; i++) {
                      threadPoolExecutor.execute(printer);
                  }
              } finally {
                  //        关闭线程池
                  threadPoolExecutor.shutdown();
      
              }
      
  • 执行结果

    pool-1-thread-1正在执行。
    pool-1-thread-2正在执行。
    pool-1-thread-1正在执行。
    pool-1-thread-1正在执行。
    pool-1-thread-1正在执行。
    pool-1-thread-1正在执行。
    pool-1-thread-2正在执行。
    pool-1-thread-1正在执行。
    pool-1-thread-2正在执行。
    pool-1-thread-1正在执行。
    pool-1-thread-1正在执行。
    pool-1-thread-1正在执行。
    pool-1-thread-2正在执行。
    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task threads.Printer@6f94fa3e rejected from java.util.concurrent.ThreadPoolExecutor@5e481248[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 13]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at threads.ThreadPoolTest.main(ThreadPoolTest.java:33)
    
    
  • 无界任务队列:使用LinkedBlockingQueue,就是任务队列可以无限制的存储任务,直到 内存溢出,同时使用这中任务队列,会使maximunPoolSize失效,线程池最多会存在corePoolSize个线程去处理任务

    • 新建一个核心线程为1,最大线程数为2,空闲线程等待时间设置为30秒(因为maximunPoolSize,会失效,就不会存在辅线程了),等待队列为 无界任务队列,默认工厂,拒绝策略为直接抛出异常,执行30个任务

       ThreadPoolExecutor threadPoolExecutor = null;
              try {
                  threadPoolExecutor = new ThreadPoolExecutor(1, 2, 30,
                          TimeUnit.SECONDS,
      //                    设置任务队列为- 无界提交队列
                          new LinkedBlockingQueue<>(),
                          Executors.defaultThreadFactory(),
                          new ThreadPoolExecutor.AbortPolicy()
                  );
                  Printer printer = new Printer();
                  for (int i = 0; i < 30; i++) {
                      threadPoolExecutor.execute(printer);
                  }
              } finally {
                  //        关闭线程池
                  threadPoolExecutor.shutdown();
      
              }
      
      
    • 执行结果:可以看到,虽然设置了maximunPoolSize为2,但是只有一个核心线程在处理任务

      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      pool-1-thread-1正在执行。
      
  • 优先任务队列:根据任务优先级来进行执行,maximunPoolSize也会失效,线程池最多存在corePoolSize个核心线程,不会创建辅线程

    • 修改Printer类:提供一个优先级变量,和对象的比较方法

      class Printer implements Runnable,Comparable<Printer> {
      
          private int priority;
      
          public Printer(int priority) {
              this.priority = priority;
          }
      
          public int getPriority() {
              return priority;
          }
      
          public void setPriority(int priority) {
              this.priority = priority;
          }
      
          @Override
          public void run() {
              System.out.println(Thread.currentThread().getName() + "正在执行。优先级是:"+priority);
          }
      
          @Override
          public int compareTo(Printer o) {
              return this.priority>o.getPriority()?-1:1;
          }
      }
      
    • 新建一个核心线程为1,最大线程数为2,空闲线程等待时间设置为30秒(同样会使maximunPoolSize失效,只会存在核心线程),等待队列为 优先任务队列,默认工厂,拒绝策略为直接抛出异常,执行30个任务

      ThreadPoolExecutor threadPoolExecutor = null;
              try {
                  threadPoolExecutor = new ThreadPoolExecutor(1, 2, 30,
                          TimeUnit.SECONDS,
      //                    设置任务队列为- 优先任务队列
                          new PriorityBlockingQueue<>(),
                          Executors.defaultThreadFactory(),
                          new ThreadPoolExecutor.AbortPolicy()
                  );
      
                  for (int i = 0; i < 30; i++) {
                      threadPoolExecutor.execute(new Printer(i));
                  }
              } finally {
                  //        关闭线程池
                  threadPoolExecutor.shutdown();
      
              }
      
    • 输出结果:可以看到除了第一个任务是0,其他的都按照优先级来执行了,因为执行到第一个任务,后面的任务被放到任务队列,并重新排序了,所以后面开始是按照优先级顺序进行执行,同时处理任务的线程也只有一个

      pool-1-thread-1正在执行。优先级是:0
      pool-1-thread-1正在执行。优先级是:29
      pool-1-thread-1正在执行。优先级是:28
      pool-1-thread-1正在执行。优先级是:27
      pool-1-thread-1正在执行。优先级是:26
      pool-1-thread-1正在执行。优先级是:25
      pool-1-thread-1正在执行。优先级是:24
      pool-1-thread-1正在执行。优先级是:23
      pool-1-thread-1正在执行。优先级是:22
      pool-1-thread-1正在执行。优先级是:21
      pool-1-thread-1正在执行。优先级是:20
      pool-1-thread-1正在执行。优先级是:19
      pool-1-thread-1正在执行。优先级是:18
      pool-1-thread-1正在执行。优先级是:17
      pool-1-thread-1正在执行。优先级是:16
      pool-1-thread-1正在执行。优先级是:15
      pool-1-thread-1正在执行。优先级是:14
      pool-1-thread-1正在执行。优先级是:13
      pool-1-thread-1正在执行。优先级是:12
      pool-1-thread-1正在执行。优先级是:11
      pool-1-thread-1正在执行。优先级是:10
      pool-1-thread-1正在执行。优先级是:9
      pool-1-thread-1正在执行。优先级是:8
      pool-1-thread-1正在执行。优先级是:7
      pool-1-thread-1正在执行。优先级是:6
      pool-1-thread-1正在执行。优先级是:5
      pool-1-thread-1正在执行。优先级是:4
      pool-1-thread-1正在执行。优先级是:3
      pool-1-thread-1正在执行。优先级是:2
      pool-1-thread-1正在执行。优先级是:1
      
      

执行任务的过程监控

beforeExecute:任务执行之前调用

afterExecute:任务执行结果后调用

terminated:线程池关闭之后调用

ThreadPoolExecutor threadPoolExecutor = null;
try {
    threadPoolExecutor = new ThreadPoolExecutor(1, 2, 30,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy()
    ){
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            System.out.println("线程:"+t.getName()+"--正在执行:优先级"+((Printer) r).getPriority());
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            System.out.println("优先级"+((Printer) r).getPriority()+"--执行完成");
        }

        @Override
        protected void terminated() {
            super.terminated();
            System.out.println("线程池退出******");
        }
    };

    for (int i = 0; i < 30; i++) {
        threadPoolExecutor.execute(new Printer(i));
    }
} finally {
    //        关闭线程池
    threadPoolExecutor.shutdown();

}
  • 打印输出为:

    线程:pool-1-thread-1--正在执行:优先级0
    pool-1-thread-1正在执行。优先级是:0
    优先级0--执行完成
    线程:pool-1-thread-1--正在执行:优先级1
    pool-1-thread-1正在执行。优先级是:1
    优先级1--执行完成
    线程:pool-1-thread-1--正在执行:优先级2
    pool-1-thread-1正在执行。优先级是:2
    优先级2--执行完成
    线程:pool-1-thread-1--正在执行:优先级3
    pool-1-thread-1正在执行。优先级是:3
    优先级3--执行完成
    线程:pool-1-thread-1--正在执行:优先级4
    pool-1-thread-1正在执行。优先级是:4
    优先级4--执行完成
    线程:pool-1-thread-1--正在执行:优先级5
    pool-1-thread-1正在执行。优先级是:5
    优先级5--执行完成
    线程:pool-1-thread-1--正在执行:优先级6
    pool-1-thread-1正在执行。优先级是:6
    优先级6--执行完成
    线程:pool-1-thread-1--正在执行:优先级7
    pool-1-thread-1正在执行。优先级是:7
    优先级7--执行完成
    线程:pool-1-thread-1--正在执行:优先级8
    pool-1-thread-1正在执行。优先级是:8
    优先级8--执行完成
    线程:pool-1-thread-1--正在执行:优先级9
    pool-1-thread-1正在执行。优先级是:9
    优先级9--执行完成
    线程:pool-1-thread-1--正在执行:优先级10
    pool-1-thread-1正在执行。优先级是:10
    优先级10--执行完成
    线程:pool-1-thread-1--正在执行:优先级11
    pool-1-thread-1正在执行。优先级是:11
    优先级11--执行完成
    线程:pool-1-thread-1--正在执行:优先级12
    pool-1-thread-1正在执行。优先级是:12
    优先级12--执行完成
    线程:pool-1-thread-1--正在执行:优先级13
    pool-1-thread-1正在执行。优先级是:13
    优先级13--执行完成
    线程:pool-1-thread-1--正在执行:优先级14
    pool-1-thread-1正在执行。优先级是:14
    优先级14--执行完成
    线程:pool-1-thread-1--正在执行:优先级15
    pool-1-thread-1正在执行。优先级是:15
    优先级15--执行完成
    线程:pool-1-thread-1--正在执行:优先级16
    pool-1-thread-1正在执行。优先级是:16
    优先级16--执行完成
    线程:pool-1-thread-1--正在执行:优先级17
    pool-1-thread-1正在执行。优先级是:17
    优先级17--执行完成
    线程:pool-1-thread-1--正在执行:优先级18
    pool-1-thread-1正在执行。优先级是:18
    优先级18--执行完成
    线程:pool-1-thread-1--正在执行:优先级19
    pool-1-thread-1正在执行。优先级是:19
    优先级19--执行完成
    线程:pool-1-thread-1--正在执行:优先级20
    pool-1-thread-1正在执行。优先级是:20
    优先级20--执行完成
    线程:pool-1-thread-1--正在执行:优先级21
    pool-1-thread-1正在执行。优先级是:21
    优先级21--执行完成
    线程:pool-1-thread-1--正在执行:优先级22
    pool-1-thread-1正在执行。优先级是:22
    优先级22--执行完成
    线程:pool-1-thread-1--正在执行:优先级23
    pool-1-thread-1正在执行。优先级是:23
    优先级23--执行完成
    线程:pool-1-thread-1--正在执行:优先级24
    pool-1-thread-1正在执行。优先级是:24
    优先级24--执行完成
    线程:pool-1-thread-1--正在执行:优先级25
    pool-1-thread-1正在执行。优先级是:25
    优先级25--执行完成
    线程:pool-1-thread-1--正在执行:优先级26
    pool-1-thread-1正在执行。优先级是:26
    优先级26--执行完成
    线程:pool-1-thread-1--正在执行:优先级27
    pool-1-thread-1正在执行。优先级是:27
    优先级27--执行完成
    线程:pool-1-thread-1--正在执行:优先级28
    pool-1-thread-1正在执行。优先级是:28
    优先级28--执行完成
    线程:pool-1-thread-1--正在执行:优先级29
    pool-1-thread-1正在执行。优先级是:29
    优先级29--执行完成
    线程池退出******
    

相关文章

网友评论

      本文标题:java多线程之ThreadPoolExecutor

      本文链接:https://www.haomeiwen.com/subject/vpkpdhtx.html