美文网首页
Java线程池的使用

Java线程池的使用

作者: MonkeyLqj | 来源:发表于2018-08-06 09:18 被阅读0次

    我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?在Java中可以通过线程池来达到这样的效果。今天我们就来了解一下Java线程池的相关知识。

    线程池的使用

    在Java1.5中提供了Executor框架用于把任务的提交和执行解耦,任务的提交交给Runnable或者Callable,而 Executor框架用来处理任务。Executor框架中最核心的成员就是 ThreadPoolExecutor,它是线程池的核心实现类。我们可以通过ThreadPoolExecutor来创建一个线程池。

     public ThreadPoolExecutor(int corePoolSize,    
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {...}
    
    • corePoolSize:核心线程池数量,默认情况下,核心线程会在线程池中一直存活,即使它们处于闲置状态。
    • maximumPoolSize:最大线程数量,当活动线程数达到这个数值后,后续的新任务将会被阻塞。
    • keepAliveTime:非核心线程闲置的超时时间,超过这个时间则回收。如果任务很多,并且每个任务的执行事件很短,则可以调大keepAliveTime来提高线程的利用率。如果设置allowCoreThreadTimeOut属性为true时,keepAliveTime也会应用到核心线程上。
    • TimeUnit:keepAliveTime参数的时间单位。可选的单位有天(DAYS)、小时(HOURS)、分钟 (MINUTES)、秒(SECONDS)、毫秒(MILLISECONDS)等。
    • workQueue:保存待执行任务的阻塞队列,如果当前线程数大于corePoolSize,则将任务添加到此任务队列中,也就是阻塞队列。
    • ThreadFactory:线程工厂,为线程池提供创建新线程的功能,ThreadFactory是一个接口,只有一个方法:newThread(Runnable r)。
    • RejectedExecutionHandler:饱和策略,当任务队列和线程池都满了时所采取的应对策略;
      (1). AbordPolicy 无法处理新任务,并抛出RejectedExecutionException异常
      (2). CallerRunsPolicy 用调用者所在的线程来处理任务。此策略提供简单的反馈控制机制,能够减缓 新任务的提交速度
      (3). DiscardPolicy 不能执行的任务,并将该任务删除。
      (4). DiscardOldestPolicy 丢弃队列最近的任务,并执行当前的任务。

    ThreadPoolExecutor使用示例

       /**
         * 通过ThreadPoolExecutor来创建一个线程池
         */
        private static void TestThreadPoolExecutor() {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<Runnable>(5));
    
            for(int i=0;i<15;i++){
                MyTask myTask = new MyTask(i);
                executor.execute(myTask);
                System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
                        executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount());
            }
            executor.shutdown();
        }
         private static class MyTask implements Runnable {
            private int taskNum;
             MyTask(int num) {
                this.taskNum = num;
            }
            @Override
            public void run() {
                System.out.println("正在执行task " + taskNum);
                try {
                    Thread.currentThread().sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("task " + taskNum + "执行完毕");
            }
        }
    //    执行结果:
    //    正在执行task 0
    //    线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    //    线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    //    线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    //    正在执行task 1
    //    线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    //    正在执行task 2
    //    线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0
    //    线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0
    //    线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0
    //    线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0
    //    线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0
    //    正在执行task 3
    //    线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
    //    正在执行task 4
    //    线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
    //    线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
    //    正在执行task 10
    //    线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
    //    线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
    //    线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0
    //    正在执行task 11
    //    正在执行task 12
    //    正在执行task 13
    //    正在执行task 14
    //    task 14执行完毕
    //    task 2执行完毕
    //    task 13执行完毕
    //    task 12执行完毕
    //    task 11执行完毕
    //    task 10执行完毕
    //    正在执行task 8
    //    正在执行task 9
    //    正在执行task 7
    //    正在执行task 5
    //    task 4执行完毕
    //    task 3执行完毕
    //    正在执行task 6
    //    task 0执行完毕
    //    task 1执行完毕
    //    task 8执行完毕
    //    task 7执行完毕
    //    task 5执行完毕
    //    task 6执行完毕
    //    task 9执行完毕
    
    

    从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了(java.util.concurrent.RejectedExecutionException)。

    线程池的种类

    在java中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池。,其中有 4 种线程池比较常用,它们分别是 FixedThreadPool、CachedThreadPool、SingleThreadExecutor和 ScheduledThreadPool。

    1. FixedThreadPool
      是一个重用固定线程数的线程池,可控制线程最大并发数,超出的线程会在队列中等待。
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    

    示例

     ExecutorService executorService = Executors.newFixedThreadPool(2);
     for (int i = 0; i < 5; i++) {
                MyTask myTask = new MyTask(i);
                executorService.execute(myTask);
            }
     executorService.shutdown();
    
    //    正在执行task 0
    //    正在执行task 1
    //    task 1执行完毕
    //    task 0执行完毕
    //    正在执行task 3
    //    正在执行task 2
    //    task 3执行完毕
    //    task 2执行完毕
    //    正在执行task 4
    //    task 4执行完毕
    

    FixedThreadPool的corePoolSize和maximumPoolSize都设置为创建FixedThreadPool指定的参数nThreads, 也就意味着FixedThreadPool只有核心线程,并且数量是固定的,没有非核心线程。keepAliveTime设置为0L 意味着多余的线程会被立即终止。因为不会产生多余的线程,所以keepAliveTime是无效的参数。另外,任 务队列采用了无界的阻塞队列LinkedBlockingQueue。当执行execute方法时,如果当前运行的线程未达到corePoolSize(核心线程数)时 就创建核心线程来处理任务,如果达到了核心线程数则将任务添加到LinkedBlockingQueue中。 FixedThreadPool就是一个有固定数量核心线程的线程池,并且这些核心线程不会被回收。当线程数超过 corePoolSize 时,就将任务存储在任务队列中;当线程池有空闲线程时,则从任务队列中去取任务执行。

    1. CachedThreadPool
      newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
      public static ExecutorService newCachedThreadPool() {
           return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                         60L, TimeUnit.SECONDS,
                                         new SynchronousQueue<Runnable>());
       }
    

    示例:

     ExecutorService executorService = Executors.newCachedThreadPool();
     for (int i = 0; i < 5; i++) {
                MyTask myTask = new MyTask(i);
                executorService.execute(myTask);
            }
     executorService.shutdown();
    
    //    正在执行task 1
    //    正在执行task 0
    //    正在执行task 2
    //    正在执行task 3
    //    正在执行task 4
    //    task 0执行完毕
    //    task 1执行完毕
    //    task 4执行完毕
    //    task 2执行完毕
    //    task 3执行完毕
    

    CachedThreadPool的corePoolSize为0,maximumPoolSize设置为Integer.MAX_VALUE,这意味着 CachedThreadPool没有核心线程,非核心线程是无界的。keepAliveTime设置为60L,则空闲线程等待新任务 的最长时间为 60s。在此用了阻塞队列 SynchronousQueue,它是一个不存储元素的阻塞队列,每个插入操作 必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。
    当执行execute方法时,首先会执行SynchronousQueue的offer方法来提交任务,并且查询线程池中是否 有空闲的线程执行SynchronousQueue的poll方法来移除任务。如果有则配对成功,将任务交给这个空闲的线 程处理;如果没有则配对失败,创建新的线程去处理任务。当线程池中的线程空闲时,它会执行 SynchronousQueue的poll方法,等待SynchronousQueue中新提交的任务。如果超过 60s 没有新任务提交到 SynchronousQueue,则这个空闲线程将终止。因为maximumPoolSize 是无界的,所以如果提交的任务大于线 程池中线程处理任务的速度就会不断地创建新线程。另外,每次提交任务都会立即有线程去处理。所以, CachedThreadPool 比较适于大量的需要立即处理并且耗时较少的任务。

    1. SingleThreadExecutor
      创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    示例:

    ExecutorService executorService = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 5; i++) {
        MyTask myTask = new MyTask(i);
        executorService.execute(myTask);
    }
    executorService.shutdown();
    
    //        正在执行task 0
    //        task 0执行完毕
    //        正在执行task 1
    //        task 1执行完毕
    //        正在执行task 2
    //        task 2执行完毕
    //        正在执行task 3
    //        task 3执行完毕
    //        正在执行task 4
    //        task 4执行完毕
    

    corePoolSize和maximumPoolSize都为1,意味着SingleThreadExecutor只有一个核心线程,其他的参数都 和FixedThreadPool一样。
    当执行execute方法时,如果当前运行的线程数未达到核心线程数,也就是当前没有运行的线程,则创 建一个新线程来处理任务。如果当前有运行的线程,则将任务添加到阻塞队列LinkedBlockingQueue中。因 此,SingleThreadExecutor能确保所有的任务在一个线程中按照顺序逐一执行。

    1. ScheduledThreadPool
      ScheduledThreadPool是一个能实现定时和周期性任务的线程池。
      public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
     /**
      * 构造方法
      */
     public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE,
                  DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                  new DelayedWorkQueue());
        }
    

    这里创建了ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,它 主要用于给定延时之后的运行任务或者定期处理任务。

    ScheduledThreadPoolExecutor 的构造方法最终调用的是ThreadPoolExecutor的 构造方法。corePoolSize是传进来的固定数值,maximumPoolSize的值是Integer.MAX_VALUE。因为采用的 DelayedWorkQueue是无界的,所以maximumPoolSize这个参数是无效的。
    示例:

      ExecutorService executorService = Executors.newScheduledThreadPool(2);
      for (int i = 0; i < 5; i++) {
          MyTask myTask = new MyTask(i);
          executorService.execute(myTask);
      }
      executorService.shutdown();
    
    //        正在执行task 1
    //        正在执行task 0
    //        task 1执行完毕
    //        task 0执行完毕
    //        正在执行task 2
    //        正在执行task 3
    //        task 2执行完毕
    //        task 3执行完毕
    //        正在执行task 4
    //        task 4执行完毕
    

    当执行 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate 或者 scheduleWithFixedDelay方法时,会向 DelayedWorkQueue 添加一个 实现 RunnableScheduledFuture 接口的ScheduledFutureTask(任务的包装类), 并会检查运行的线程是否达到 corePoolSize。如果没有则新建线程并启动它,但并不是立即去执行任务,而 是去 DelayedWorkQueue 中取ScheduledFutureTask,然后去执行任务。如果运行的线程达到了corePoolSize 时,则将任务添加到DelayedWorkQueue中。DelayedWorkQueue会将任务进行排序,先要执行的任务放在队 列的前面。其跟此前介绍的线程池不同的是,当执行完任务后,会将ScheduledFutureTask中的time变量改为 下次要执行的时间并放回到DelayedWorkQueue中。

    参考资料
    1.Android进阶之光 刘望舒
    2.https://www.cnblogs.com/dolphin0520/p/3932921.html

    相关文章

      网友评论

          本文标题:Java线程池的使用

          本文链接:https://www.haomeiwen.com/subject/ifkmvftx.html