美文网首页Java-多线程程序员
Java再回顾(1)—线程池回顾与思考

Java再回顾(1)—线程池回顾与思考

作者: 史小豪 | 来源:发表于2020-06-10 22:12 被阅读0次

    概述

    本文是针对java线程池的回顾与思考,主要是围绕java线程池的思路梳理与知识总结。长文预警(写这篇真的累到爆炸)!!!

    概念

    什么是线程?

    线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

    相信大家对线程的概念都不会陌生。
    实际上,在日常的开发过程中,也会遇到大量的使用多线程的场景,如异步计算、android新开线程进行耗时操作等。

    如何实现线程?

    通常,实现线程的主要方式有以下两种(当然还有实现callable等等...):
    1.1继承Thread类(java.lang.Thread),实现run方法:

    public class ThreadDemo1 extends Thread {
        public  void run(){
            //do something
        }
    }
    

    1.2实现Runnable接口(java.lang.Runnable),实现run方法:

    public class ThreadDemo2 implements Runnable {
        public  void run(){
            //do something
        }
    }
    

    1.3两种启动方式的对比
    -Thread占据了父类的名额,没有Runnable方便。因为java是单继承。
    -Thread类本身也是实现了Runnable
    如下图所示:

    image.png
    -Runnable启动需要Thread类的支持
    如下图所示:
    image.png
    -Runnable更容易实现多线程中的资源共享
    结论:更建议实现Runnable接口来创建线程

    线程的启动?
    通常都是new 、start二连...
    最原始的做法是有一个任务就new一个线程..
    然而,线程的创建是具有一定开销的,频繁地new线程可能会引起一系列的问题:占用过多资源导致死机、线程间没有互动无法完成协作、对线程缺乏管理.....
    所以,java为我们引入了线程池——ThreadPoolExecutor,来统一的管理线程,方便线程的复用。

    正文

    ThreadPoolExecuto的体系

    可能一百度线程池,会看到什么java四种线程池(FixedThreadPool、CachedThreadPool、SingleThreadExecutor、ScheduledThreadPool),实际它们都是基于ThreadPoolExecutor,只不过参数不同。
    我们先看一下ThreadPoolExecuto的体系:


    ThreadPoolExecutor体系.png

    先看一下Executor接口,如图所示:


    image.png

    它只包含了一个execute方法,实际任务的提交、线程池的关闭等由ExecutorService决定。而继承了ExecutorService的抽象类AbstractExecutorService则实现了一部分接口....这里就不赘述了,直接进入整体。

    ThreadPoolExecuto的构造函数
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    这好像多少有些吓人...
    但是别怕,咱们慢慢来。
    首先,细心的同学可能已经发现,前三个构造器实际都是调用了最后一个构造器,只不过参数有所差异。
    所以,我们先结合系统的javaDoc从共有的参数来讲起:

    corePoolSize

         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
    

    核心线程:在创建完线程池后,核心线程先不创建,接到任务后创建核心线程。核心线程即使空闲也依旧会保留在线程池中,除非设置了allowCoreThreadTimeOut。当allowCoreThreadTimeOut设置之后,那么核心线程超时后就会销毁。

    maximumPoolSize

         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
    

    线程池最大数量:这个很好理解,根据字面意思就能明白。线程池最大数量=核心线程数量+非核心线程数。

    keepAliveTime

         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
    

    非核心线程的超时时长:就是字面意思,如果非核心线程执行完任务空闲了,等待任务到来的时长超出这个则会回收

    unit

    * @param unit the time unit for the {@code keepAliveTime} argument
    

    这里的unit是指keepAliveTime的计量单位,使用TimeUnit。
    TimeUnit是一个枚举类型,包括微毫秒、微秒一直到天。

    workQueue

     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
    

    workQueue:任务阻塞队列,默认情况下,任务添加进来会先交给核心线程执行,如果没有核心线程空闲,则加入到任务队列中等待执行,任务队列可以设置一个最大值,当达到最大值后则创建非核心线程执行任务。
    常见的workQueue有五种:
    1.SynchronousQueue:同步阻塞队列,不存储元素。一接收到任务,就提交给线程执行,如果无空闲线程,则会创建新的线程。SynchronousQueue的插入操作是阻塞的,每个插入操作必须要等到另一个线程调用移除操作。

    image.png
    如图所示,前面提到的四大线程池之一的CachedThreadPool,就使用了这个队列。由于一有新的任务,就要复用线程/新创建线程,所以如果maximumPoolSize设置太小,就会抛出异常。
    下面手打一段非常简单的代码测试一下:
    public class ThreadPoolTest {
        private static ExecutorService pool;
    
        public static void main(String[] args) throws InterruptedException {
            pool = new ThreadPoolExecutor(0, 2,
                    60L, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>());
            for(int i=0;i<10;i++) {
                pool.execute(new ThreadTest());
            }
        }
    }
         class ThreadTest implements Runnable{
    
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"跑跑跑");
            }
        }
    

    异常抛出:


    RejectedExecutionException.png

    所以对于设置SynchronousQueue的pool,要注意maximumPoolSize的设置。如同上面的CachedThreadPool设置为maxsize,实际虽然不会抛出策略异常,但是可能会导致oom。

    2.LinkedBlockingQueue
    这个队列默认情况下是无界的,也就是说在未设置队列容量的情况下,队列容量是最大值,如下图所示。

    image.png

    一有任务到来,如果没有空闲/可创建的核心线程,任务就会被加入到这个队列之中去, 从而使得maximumPoolSize失去作用。
    前面提到的四大线程池之一的FixedThreadPool就采用了这个队列。


    image.png

    3.DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。

    4.PriorityBlockingQueue:具有优先级的无界阻塞队列

    5.ArrayBlockingQueue:用数组实现的有界阻塞队列。如果任务来临,核心线程数满了,队列满了,最大线程数也满了,就会出错或者执行饱和策略。

    回归正题,介绍新的构造器参数。
    ThreadFactory:创建线程的工厂,事实上大部分时候都不用管这个参数,这个参数是用来给线程配置信息的。

    RejectedExecutionHandler:饱和(拒绝)策略
    该策略是当线程数已满且都在工作中、队列也都满了的时候,所要采用的应对策略,它是线程池的一种保护机制。
    拒绝策略共有下面四种:

    1. AbortPolicy:默认策略,表示无法处理新任务,并抛出RejectedExecutionException 异常 。
      具体实例就如本文开头测试的RejectedExecutionException图所示。
    2. CallerRunsPolicy:由调用者所在的线程进行处理,这种策略会提供简单的反馈控制机制。
    3. DiscardPolicy:丢弃任务,但不抛出异常。使用这种策略,可能会使得无法发现系统的异常状态。
    4. DiscardOldestPolicy:将队列头部的任务抛弃,然后重新提交新任务。
    四大线程池

    好了,终于进入喜闻乐见的四大线程池部分了。前面已经说过了,四大线程池是(FixedThreadPool、CachedThreadPool、SingleThreadExecutor、ScheduledThreadPool)。
    Java将创建这四种线程池的方法都放在了Executors这个工厂类之中(当然,能不用Executors是最好哈,阿里巴巴的开发规范就有一条明确说明不准使用Executors创建线程池,要使用ThreadPoolExecutor)。
    当然了,我们之前也说过,实际上他们都是由ThreadPoolExecutor构造而成的。下面我们看一下Executors相关线程池的创造方法,你就会发现使用ThreadPoolExecutor会更有助于理解线程池而且很香哦。
    下面我们逐一看一下:
    1.CachedThreadPool
    这是一个缓冲线程池,有任务就让空闲线程/创建新线程运行,空闲线程的超时时间为60秒。

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    2.FixedThreadPool:
    创建一个固定数目的可重用的线程池。这个采用了 LinkedBlockingQueue也就是上面我们说到的无界阻塞队列。

    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    3.ScheduledThreadPoolExecutor
    定时线程池,没啥好讲的,看了前面的内容,相信聪明的你一下就明白辣。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE,
                  DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                  new DelayedWorkQueue());
        }
    

    4.SingleThreadExecutor
    一个核心线程,先进先出。

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    总结

    实际上,推荐ThreadPoolExecutor的使用来替代Executors,会更便于理解线程池,也能规避一些错误。要注意的就是线程池最大数量、饱和策略、阻塞队列的选择。

    后续

    天,写一篇关于线程池的文章的想法已经有一段时间了,一直懒于提笔,今天终于把债还上了.....


    image.png

    相关文章

      网友评论

        本文标题:Java再回顾(1)—线程池回顾与思考

        本文链接:https://www.haomeiwen.com/subject/hgoetktx.html