线程池

作者: 编程的猫 | 来源:发表于2020-03-04 17:49 被阅读0次

    什么是线程池?

    • 线程池本质上是一种对象池,用来对线程进行管理
    • 在执行任务时需要将线程从线程池取出
    • 在任务执行完成后,需要将线程归还给线程池
    • 通过这种复用,可以有效避免直接创建线程带来的负面影响

    使用线程池的好处:

    • 降低资源的消耗,创建线程本来就要占用一定的资源,线程池复用线程可以减少线程创建和销毁带来的资源开销(创建线程会占用一定内存)
    • 提高任务执行的响应速度,不必等到线程创建完成才开始执行
    • 提高线程的可管理性,控制线程的并发数。线程不能无限制地创建,需要进行统一的分配、调优和监控

    ThreadPoolExecutor

    /* 
    *@ ThreadPoolExecutor构造参数介绍 
    *@author SEU_Calvin 
    * @date 2016/09/03 
    */  
    public ThreadPoolExecutor(  
    //核心线程数,除非allowCoreThreadTimeOut被设置为true,否则它闲着也不会死  
    int corePoolSize,   
    //最大线程数,活动线程数量超过它,后续任务就会排队                     
    int maximumPoolSize,   
    //超时时长,作用于非核心线程(allowCoreThreadTimeOut被设置为true时也会同时作用于核心线程),闲置超时便被回收             
    long keepAliveTime,                            
    //枚举类型,设置keepAliveTime的单位,有TimeUnit.MILLISECONDS(ms)、TimeUnit. SECONDS(s)等  
    TimeUnit unit,  
    //缓冲任务队列,线程池的execute方法会将Runnable对象存储起来  
    BlockingQueue<Runnable> workQueue,  
    //线程工厂接口,只有一个new Thread(Runnable r)方法,可为线程池创建新线程  
    ThreadFactory threadFactory)
    

    ThreadPoolExecutor的各个参数所代表的特性注释中已经写的很清楚了,那么ThreadPoolExecutor执行任务时的心路历程是什么样的呢?(以下用currentSize表示线程池中当前线程数量)

    • 当currentSize<corePoolSize时,没什么好说的,直接启动一个核心线程并执行任务。
    • 当currentSize>=corePoolSize、并且workQueue未满时,添加进来的任务会被安排到workQueue中等待执行。
    • 当workQueue已满,但是currentSize<maximumPoolSize时,会立即开启一个非核心线程来执行任务。
    • 当currentSize>=corePoolSize、workQueue已满、并且currentSize>maximumPoolSize时,调用handler默认抛出RejectExecutionExpection异常。


      处理流程
      执行流程
    RejectedExecutionHandler:饱和策略。

    饱和策略有四种:

    • AbordPolicy:无法处理新任务,并抛出RejectedExecutionException异常。
    • CallerRunsPolicy:用调用者所在的线程来处理任务。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
    • DiscardPolicy:不能执行的任务,并将该任务删除。
    • DiscardOldestPolicy:丢弃队列最近的任务,并执行当前的任务。

    四种常用的线程池

    首先来看看这四种线程池的适用场景:
    • CachedThreadPool:用来创建一个可以无限扩大的线程池,适用于负载较轻的场景,执行短期异步任务。(可以使得任务快速得到执行,因为任务时间执行短,可以很快结束,也不会造成cpu过度切换)。CachedThreadPool只有非核心线程,最大线程数非常大,所有线程都活动时,会为新任务创建新线程,否则利用空闲线程(60s空闲时间,过了就会被回收,所以线程池中有0个线程的可能)处理任务。
    • FixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于负载较重的场景,对当前线程数量进行限制。(保证线程数可控,不会造成线程过多,导致系统负载更为严重)。FixThreadPool只有核心线程,并且数量固定的,也不会被回收,所有线程都活动时,因为队列没有限制大小,新任务会等待执行。
    • SingleThreadExecutor:创建一个单线程的线程池,适用于需要保证顺序执行各个任务。SingleThreadPool只有一个核心线程,确保所有任务都在同一线程中按顺序完成。因此不需要处理线程同步的问题。
    • ScheduledThreadPool:适用于执行延时或者周期性任务。核心线程数固定,非核心线程(闲着没活干会被立即回收)数没有限制

    通过直接或间接的配置ThreadPoolExecutor的参数可以创建不同类型的ThreadPoolExecutor

    FixedThreadPool是可重用的固定线程数的线程池,在Executors类中提供了创建FixedThreadPool的方法:
    public static ExecutorService newFixedThreadPool(int nThreads){
        return new ThreadPoolExecutor(nThreads,nThreads,
                                                            0L,TimeUnit.MILLISECONDS,
                                                            new LinkedBlockingQueue<Runnable>());
    }
    
    • FixedThreadPool的corePoolSize和maximumPoolSize都设置为创建FixedThreadPool指定的参数nThreads,意味着FixedThreadPool只有核心线程,并且数量是固定的,没有非核心线程。keepAliveTime设置为0L,意味着多余的线程会被立即终止。因为不会产生多余的线程,所以KeepAliveTime是无效的参数。任务队列采用了无界的阻塞队列LinkedBlockingQueue。FixedThreadPool的execute方法执行图如下:


      FixedThreadPool线程池执行流程

    由图分析可知:
    当执行execute方法时,如果当前运行的线程未达到corePoolSize(核心线程数)时就创建核心线程来处理任务,如果达到了核心线程数则将任务添加到LinkedBlockingQueue中。FixedThreadPool就是一个有固定数量核心线程的线程池,并且这些核心线程不会被回收。当线程超过corePoolSize时,就将任务存储在任务队列中。当线程池有空闲线程时,则从任务队列中去取任务执行。

    CachedThreadPool是一个根据需要创建线程的线程池。线程数量原则上不设上限

    在Executors类中创建CachedThreadPool的方法如下:

    public static ExecutorService newCachedThreadPool(){
        return new ThreadPoolExecutor(0,Integer.MAX_VALUE,
                                                            60L,TimeUnit.SECONDS,
                                                            new SynchronousQueue<Runnable>());
    }
    

    CachedThreadPool的corePoolSize为0,maximumPoolSize设置为Integer.MAX_VALUE,意味着CachedThreadPool没有核心线程,非核心线程是无界的。KeepAliveTime设置为60L,则空闲线程等待新任务的最长时间为60s。在此用了阻塞队列SynchronousQueue,它是一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。


    CachedThreadPool线程池执行流程图

    由图分析可知,当执行execute方法时,首先会执行SynchronousQueue的offer方法来提交任务,并查询线程池中是否有空闲的线程执行SynchronousQueue的poll方法来移除任务。如果有则配对成功,将任务交给这个空闲的线程处理。如果没有则配对失败,创建新的线程去处理任务。当线程池中的线程空闲时,它会执行SynchronousQueue的poll方法,等待SynchronousQueue中新提交的任务。如果超过了60s没有新任务提交到SynchronousQueue,则这个空闲线程将终止。因为maximumPoolSize是无界的,所以如果提交的任务大于线程池中线程处理任务的速度就会不断的创建新线程。另外,每次提交任务都会立即有线程去处理。所以,CachedThreadPool比较适于大量的需要立即处理并且耗时较少的任务。

    SingleThreadExecutor是使用单个工作线程的线程池,其创建代码如下
    public static ExecutorService newSingleThreadExecutor{
        return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1,1,
                                                                                        0L,TimeUnit.MILLISECONDS,
                                                                                        new LinkedBlockingQueue<Runnable>()));
    }
    

    corePoolSize和maximumPoolSize都为1,意味着SingleThreadExecutor只有一个核心线程,其他的参数都和FixedThreadPool一样。


    SingleThreadExecutor执行的流程示意图

    由图分析,可知:
    当执行execute方法时,如果当前运行的线程数未达到核心线程数,也就是当前没有运行的线程,则创建一个新线程来处理任务。如果当前有运行的线程,则将任务添加到阻塞队列LinkedBlockingQueue中。因此,SingleThreadExecutor能确保所有的任务在一个线程中按照顺序逐一执行。

    ScheduledThreadPool是一个能实现定时和周期性任务的线程池。创建代码如下:
    public staic ScheduledExecutorService newScheduledThreadPool(int corePoolSize){
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    

    这里创建了ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,它主要用于给定延时以后运行的任务或者定期处理任务。ScheduledThreadPoolExecutor的构造方法如下:

    public ScheduledThreadPoolExecutor(int corePoolSize){
        super(corePoolSize,Integer.MAX_VALUE,
                    DEFAULT_KEEPALIVE_MILLIS,MILLISECONDS,
                    new DelayedWorkQueue());
    }
    

    ScheduledThreadPoolExecutor的构造方法最终调用的是ThreadPoolExecutor的构造方法。corePoolSize是传进来的固定数值,maximumPoolSize的值是Integer.MAX_VALUE。因为采用的DelayedWorkQueue是无界的,所以maximumPoolSize这个参数是无效的。


    ScheduledThreadPoolExecutor的execute方法的执行示意图

    由图分析,可知:
    当执行ScheduledThreadPoolExecutor的scheduleAtFixedRate或者scheduleWithFixDelay方法时,会向DelayedWorkQueue添加一个实现RunnableScheduledFuture接口的ScheduledFutureTask(任务的包装类),并会检查运行的线程是否达到corePoolSize。如果没有则新建线程并启动它,但不是立即去执行任务,而是去DelayedWorkQueue中取出ScheduledFutureTask,然后去执行任务。如果运行的线程达到了corePoolSize时,则将任务添加到DelayedWorkQueue中。DelayedWorkQueue会将任务进行排序,先要执行的任务放在队列的前面。其和上面介绍的几个线程池不同的是,当执行完任务后,会将ScheduledFutureTask的time变量改为下次要执行的时间并放回到DelayedWorkQueue中。

    线程池为什么要用(阻塞)队列?

    • 因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换
    • 创建线程池的消耗较高。线程池创建线程需要获取mainlock这个全局锁,影响并发效率,阻塞队列可以很好的缓冲

    线程池为什么要使用阻塞队列而不使用非阻塞队列?

    阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源。当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。使得在线程不至于一直占用cpu资源。

    线程执行完任务后通过循环再次从任务队列中取出任务进行执行,代码片段如下

    while (task != null || (task = getTask()) != null) {})。
    

    不用阻塞队列也是可以的,不过实现起来比较麻烦而已

    如何选型线程池

    • CPU密集型任务
      尽量使用较小的线程池,一般为CPU核心数+1。 因为CPU密集型任务使得CPU使用率很高,若开过多的线程数,会造成CPU过度切换。
    • IO密集型任务
      可以使用稍大的线程池,一般为2*CPU核心数。 IO密集型任务CPU使用率并不高,因此可以让CPU在等待IO的时候有其他线程去处理别的任务,充分利用CPU时间。
    • 混合型任务
      可以将任务分成IO密集型和CPU密集型任务,然后分别用不同的线程池去处理。 只要分完之后两个任务的执行时间相差不大,那么就会比串行执行来的高效。因为如果划分之后两个任务执行时间有数据级的差距,那么拆分没有意义。因为先执行完的任务就要等后执行完的任务,最终的时间仍然取决于后执行完的任务,而且还要加上任务拆分与合并的开销,得不偿失。
    execute()和submit()方法
    • execute(),执行一个任务,没有返回值。
    • submit(),提交一个线程任务,有返回值。
      submit(Callable<T> task)能获取到它的返回值,通过future.get()获取(阻塞直到任务执行完)。一般使用FutureTask+Callable配合使用(IntentService中有体现)。
      submit(Runnable task, T result)能通过传入的载体result间接获得线程的返回值。
      submit(Runnable task)则是没有返回值的,就算获取它的返回值也是null。
      Future.get方法会使取结果的线程进入阻塞状态,知道线程执行完成之后,唤醒取结果的线程,然后返回结果。

    创建线程池示例,这里没有添加自定义的饱和策略

    public class TgExecutor {
    
        private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
        private static final int CORE_POOL_SIZE = CPU_COUNT + 1;
        private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
        private static final int KEEP_ALIVE = 1;
    
        private static final ThreadFactory sThreadFactory = new ThreadFactory() {
            private final AtomicInteger mCount = new AtomicInteger(1);
    
            public Thread newThread(Runnable r) {
                return new Thread(r, "TgThread #" + mCount.getAndIncrement());
            }
        };
        //阻塞队列   capacity阻塞队列的容量
        public static final BlockingQueue<Runnable> sPoolWorkQueue =
                new LinkedBlockingQueue<Runnable>(128);
        
        /**
         * An {@link Executor} that can be used to execute tasks in parallel.
         */
        private static final Executor THREAD_POOL_EXECUTOR
                = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE,
                TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
    
        public static Executor getExecutor() {
            return THREAD_POOL_EXECUTOR;
        }
    
    }
    

    参考相关博客:
    https://www.cnblogs.com/1925yiyi/p/9040605.html
    https://blog.csdn.net/qq_39969226/article/details/88141264

    相关文章

      网友评论

        本文标题:线程池

        本文链接:https://www.haomeiwen.com/subject/qsvblhtx.html