美文网首页
Java线程池

Java线程池

作者: Lebens | 来源:发表于2018-05-09 15:41 被阅读0次

    本文主要介绍java线程池相关的内容,包括线程池的几种常用方式

    线程池存在的意义

    先抛一个问题,Thread能直接新建并使用,为什么还要使用线程池?

    如果看过我之前写的博客的话可以知道,一个线程从新建到死亡,总共有5个状态。创建、运行包括死亡都会消耗cpu等系统资源。如果子线程每次执行的任务量很小,但是数量很大时,你会发现,基本上所有的系统资源都消耗在线程管理上。同时线程之间的执行权竞争也会消耗一定的系统资源,这就导致了程序的执行效率降低。另外还有一点就是,随意新建的线程无法做到统一管理。线程池的存在就是为了解决上面提到的问题,以提高程序的执行效率。

    Executor

    Executor其实更准确的解释是执行器,定义了一个线程执行的规范,跟我们常说的线程池相差有点远。

    An object that executes submitted Runnable tasks. This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explicitly creating threads.

    这段话的意思是,Executor是用来执行被提交的Runnable任务的。这个接口提供了一种将任务提交与每个任务如何运行的机制解耦的方式,包括线程使用,调度等细节。通常Executor用来代替显示的创建线程。

    上面是官方给的注释,那Executor这跟我们今天要说的线程有什么关系呢。

    其实Executor是线程池的父类,最终的实现类有两个ThreadPoolExecutor以及ScheduledThreadPoolExecutor,其中ThreadPoolExecutor就是我们常说的线程池。

    在ThreadPoolExecutor之上还有一个接口ExecutorService,定义了任务提交、执行等一些相关接口,继承结构图如下所示:

    线程池继承结构图.jpeg

    ExecutorService

    相对于Executor,ExecutorService更多是功能的定义,提供了诸如

    1. <T> Future<T> submit(Callable<T> task);
    2. Future<?> submit(Runnable task);

    的方法,可以让线程执行返回结果。

    ThreadPoolExecutor

    这个才是我们常用的“线程池”,主要来看一下构造函数。

    ThreadPoolExecutor有4个构造函数,

    public ThreadPoolExecutor(int corePoolSize,                             
                              int maximumPoolSize,                          
                              long keepAliveTime,                           
                              TimeUnit unit,                                
                              BlockingQueue<Runnable> workQueue) {          
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
             Executors.defaultThreadFactory(), defaultHandler);             
    }
    
    ...
    
    public ThreadPoolExecutor(int corePoolSize,                             
                              int maximumPoolSize,                          
                              long keepAliveTime,                           
                              TimeUnit unit,                                
                              BlockingQueue<Runnable> workQueue,            
                              ThreadFactory threadFactory) {                
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
             threadFactory, defaultHandler);                                
    }                                                                       
    
    ...
    
    public ThreadPoolExecutor(int corePoolSize,                             
                              int maximumPoolSize,                          
                              long keepAliveTime,                           
                              TimeUnit unit,                                
                              BlockingQueue<Runnable> workQueue,            
                              RejectedExecutionHandler handler) {           
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 
             Executors.defaultThreadFactory(), handler);                    
    } 
    
    ... 
    
    public ThreadPoolExecutor(int corePoolSize,                            
                              int maximumPoolSize,                         
                              long keepAliveTime,                          
                              TimeUnit unit,                               
                              BlockingQueue<Runnable> workQueue,           
                              ThreadFactory threadFactory,                 
                              RejectedExecutionHandler handler) {          
        if (corePoolSize < 0 ||                                            
            maximumPoolSize <= 0 ||                                        
            maximumPoolSize < corePoolSize ||                              
            keepAliveTime < 0)                                             
            throw new IllegalArgumentException();                          
        if (workQueue == null || threadFactory == null || handler == null) 
            throw new NullPointerException();                              
        this.acc = System.getSecurityManager() == null ?                   
                null :                                                     
                AccessController.getContext();                             
        this.corePoolSize = corePoolSize;                                  
        this.maximumPoolSize = maximumPoolSize;                            
        this.workQueue = workQueue;                                        
        this.keepAliveTime = unit.toNanos(keepAliveTime);                  
        this.threadFactory = threadFactory;                                
        this.handler = handler;                                            
    }                                                                                                                                                                                                                   
    

    最后都是调用到了参数最长的那个,我们来看一下里面个各个参数的意思:

    • corePoolSize : 核心线程的数量
    • maximumPoolSize : 最大线程数量,包括核心线程和非核心线程
    • keepAliveTime : 非核心线程空闲时存活的时间
    • unit : 存活时间的时间单位
    • workQueue : 任务队列
    • threadFactory : 创建线程的工厂类
    • handler : 当任务无法被处理的对象

    线程数量

    关于corePoolSize、maximumPoolSize和workQueue这里要单独拿出来讲一下。

    首先要明确两个概念:核心线程非核心线程

    核心线程是指线程空闲时也保存的线程池中的线程,最大数量就是corePoolSize个,当然如果设置了allowCoreThreadTimeOut,那么核心线程也会被回收。

    非核心线程是指核心线程不够处理任务,同时线程池稍微到达maximumPoolSize时添加的线程,这些线程会在空闲keepAliveTime后被回收。

    但是保存在线程池中的线程并没有真正的核心线程和非核心线程的区别,只是开始回收线程时,线程数量达到核心线程的数量便不再回收。

    知道上面这个两个我们就可以接着往下讲了。

    什么时候新建核心线程

    当线程池的线程数量少于corePoolSize时,不管是否有核心线程空闲,只要来新任务,都是直接新建核心线程用来出来任务。

    什么时候新建非核心线程

    这个问题比较复杂一点,因为涉及到workQueue的类型以及容量问题,直接一点的的回答就是:

    • 当workQueue被塞满时,这时来新的任务才会新建非核心线程进行处理,如果线程池中的线程数量达到了maximumPoolSize,这时再来新任务将被拒绝,同时抛出一个RejectedExecutionException。

    • 但是存在一个特殊的情况,那就是设置的核心线程数量为0,这个时候会新建一个非核心线程用于处理任务

    什么时候将任务放入workQueue

    当线程池中的核心线程达到了corePoolSize,同时没有核心线程空闲,这时来新的任务都会被放入workQueue,当workQueue被塞满时就会尝试新建非核心线程。

    所以我们可以看到,一个线程池中线程池中能存在的最大线程个数就是maximumPoolSize,但是能保存的最大任务数就会比这个大一点,是maximumPoolSize + workQueue.size() 之和。

    大致的流程下图所示:

    线程池任务分配.png

    下面是demo验证一下正常情况:

    public class ExecutorTest {
    
        public static void main(String[] args) {
    
    try {
    
                ThreadPoolExecutor executor = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    
                for (int i = 0; i < 10; i++) {
    
                    final int index = i;
    
                    executor.execute(() -> {
                        try {
                            Thread.sleep(200);
                            System.out.println("runnable index is " + index
                                    + " and time is " + System.currentTimeMillis());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
                }
    
    
                //workers 在线程池中用保存有新建完的线程,由于不存在直接获取的方法,这里用反射获取
                Class clazz = executor.getClass();
                Field workers = clazz.getDeclaredField("workers");
                workers.setAccessible(true);
    
                while (true) {
    
                    HashSet hashSet = (HashSet) workers.get(executor);
                    System.out.println("Queue size is " + executor.getQueue().size()
                            + " and thread count is " + hashSet.size());
    
    
                    Thread.sleep(1000);
                }
            } catch (Exception ignore) {
    
            }
        }
    }
    

    打印如下:

    time is 1525846211057 and thread count is 1
    runnable index is 0 and time is 1525846211260
    runnable index is 1 and time is 1525846211464
    runnable index is 2 and time is 1525846211665
    runnable index is 3 and time is 1525846211870
    time is 1525846212062 and thread count is 1
    runnable index is 4 and time is 1525846212074
    time is 1525846213064 and thread count is 1
    time is 1525846214069 and thread count is 1
    time is 1525846215070 and thread count is 1
    time is 1525846216074 and thread count is 1
    time is 1525846217076 and thread count is 1
    time is 1525846218079 and thread count is 1
    time is 1525846219084 and thread count is 1
    time is 1525846220089 and thread count is 1
    time is 1525846221092 and thread count is 1
    

    由于没有提供直接获取线程池中当前线程数量方法,用反射的方法来获取。

    可以看到,就算非核心线程数量设置到了Integer.MAX_VALUE,但是任务队列并没有满,还是只有1个核心线程在执行任务。并且过了超时时间,线程并没有被销毁。

    再来一个demo验证一下核心线程设置为0的情况:

    public class ExecutorTest {
    
        public static void main(String[] args) {
    
    try {
    
                ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    
                for (int i = 0; i < 10; i++) {
    
                    final int index = i;
    
                    executor.execute(() -> {
                        try {
                            Thread.sleep(200);
                            System.out.println("runnable index is " + index
                                    + " and time is " + System.currentTimeMillis());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
                }
    
    
                //workers 在线程池中用保存有新建完的线程,由于不存在直接获取的方法,这里用反射获取
                Class clazz = executor.getClass();
                Field workers = clazz.getDeclaredField("workers");
                workers.setAccessible(true);
    
                while (true) {
    
                    HashSet hashSet = (HashSet) workers.get(executor);
                    System.out.println("Queue size is " + executor.getQueue().size()
                            + " and thread count is " + hashSet.size());
    
    
                    Thread.sleep(1000);
                }
            } catch (Exception ignore) {
    
            }
        }
    }
    

    打印如下:

    time is 1525846316180 and thread count is 1
    runnable index is 0 and time is 1525846316383
    runnable index is 1 and time is 1525846316589
    runnable index is 2 and time is 1525846316793
    runnable index is 3 and time is 1525846316999
    time is 1525846317184 and thread count is 1
    runnable index is 4 and time is 1525846317201
    time is 1525846318188 and thread count is 1
    time is 1525846319193 and thread count is 1
    time is 1525846320197 and thread count is 1
    time is 1525846321198 and thread count is 1
    time is 1525846322202 and thread count is 1
    time is 1525846323206 and thread count is 0
    time is 1525846324206 and thread count is 0
    time is 1525846325211 and thread count is 0
    

    当核心线程设置为0时,提交任务给线程池,会新建一个非核心线程用于处理任务,过了超时时间后线程就被回收了。

    Executors提供的4种默认线程池

    Executors是一个线程池的工具类,最主要的用处就是给我们提供了4种常用的线程池的创建方式。

    newFixedThreadPool(int nThreads)

    这个静态返回一个固定线程数量的线程池,来看一下里面的实现。

    public static ExecutorService newFixedThreadPool(int nThreads) {          
        return new ThreadPoolExecutor(nThreads, nThreads,                     
                                      0L, TimeUnit.MILLISECONDS,              
                                      new LinkedBlockingQueue<Runnable>());   
    }                                                                         
    

    这里就开始用到文章上面所解释的参数的意义,这里可以看到,核心线程和最大线程数量都为指定的nThreads,非核心线程超市时间为0,也就是默认值,任务队列为一个LinkedBlockingQueue。

    解释一下为什么这是一个固定线程的线程池,结合上面的任务分配图可以很清楚的知道,

    1. 当新任务来时,最开始肯定是新建核心线程进行任务直接处理
    2. 当核心线程满了之后就是任务入队了,但是LinkedBlockingQueue理论上是一个无限大的队列,可以一直存放任务,这个线程池中不会存在非核心线程,线程的数量也就固定了。

    newSingleThreadExecutor()

    这个就是新建一个只有一个核心线程的线程池。

    public static ExecutorService newSingleThreadExecutor() {              
        return new FinalizableDelegatedExecutorService                     
            (new ThreadPoolExecutor(1, 1,                                  
                                    0L, TimeUnit.MILLISECONDS,             
                                    new LinkedBlockingQueue<Runnable>())); 
    }                                                                      
    

    这个就不多解释了,看上面就知道了,只是把核心线程数指定为了1。

    newCachedThreadPool()

    这就是不限制线程的数量,但是线程会被回收。

    public static ExecutorService newCachedThreadPool() {                
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,              
                                      60L, TimeUnit.SECONDS,             
                                      new SynchronousQueue<Runnable>()); 
    }                                                                    
    

    还是套用上面的任务分配图:

    1. 核心线程数量为0,以为这所有的任务都先进任务队列
    2. 但是SynchronousQueue是一个同步的队列,也就是SynchronousQueue中不会存在任务,只要有新任务都是直接分配出去处理。
    3. 最大线程数量为Integer.MAX_VALUE,意味着可以处理的任务是无限量的。
    4. 看完上面几点可以知道,当有任务来时都是交给非核心线程处理的,线程数量不够时直接新建非核心线程。所有线程空闲超过60秒将会被回收,这也是Cached的含义。

    newScheduledThreadPool(int corePoolSize)

    这个线程池主要用于执行需要重复执行的任务上,可以指定任务循环执行的模式。

    public static ScheduledExecutorService newScheduledThreadPool(          
            int corePoolSize, ThreadFactory threadFactory) {                
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }                                                                       
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {                  
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,              
              new DelayedWorkQueue());                                      
    }                                                                       
    

    可以看到最后其实是新建了一个ThreadPoolExecutor,设置的核心线程数为corePoolSize,队列为一个无限大小的延时队列。

    三个方法说明一下:

    1. schedule(Runnable command,long delay,TimeUnit unit):就是一个只执行一次的延时任务,这个方法接受3个参数

      • 执行的任务
      • 延时的时间
      • 时间单位
    2. scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit): 这是一个无限次执行的延时任务,这个方法接受4个参数

      • 执行的任务
      • 第一次开始执行的延时
      • 以后每次开始的延时
      • 时间单位
    3. scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit):这是一个无限次执行的延时任务,只不过每次新任务总在上一个任务执行完毕开始,这方法接受4个参数

      • 执行的任务
      • 第一次开始执行的延时
      • 上个任务结束后,开始下个任务的延时
      • 时间单位

    总结

    这篇基本上算都是干货了,总结没啥好写的,再看一遍文章,能记住的都记住吧,特别是文中提到的几个问题。

    相关文章

      网友评论

          本文标题:Java线程池

          本文链接:https://www.haomeiwen.com/subject/qmtorftx.html