美文网首页
教你如何多人反复共用一桶洗澡水:线程池

教你如何多人反复共用一桶洗澡水:线程池

作者: 笔记本一号 | 来源:发表于2020-05-12 23:18 被阅读0次

    今天我们聊聊在JAVA世界中如何实现一桶洗澡水可供成百上千的人反复使用的故事

    意义:线程池可以反复使用已经创建的线程,减小了线程反复创建和销毁的开销,避免过多线程占用太多内存

    构造参数

    1、corePoolSize
    核心线程数,线程池初始化完成后,不会马上创建线程,而是等待任务的到来才会去创建线程,只要核心线程还没有满,每次有任务提交进来即时有其他线程处于空闲状态,线程池依然会创建新线程直到达到核心线程数量,之后就算没有任务,线程池中的线程数量也会维持核心线程数
    2、maxPooolSize
    最大线程数,当任务存储队列的任务达到了设置的最大值时,线程池就会去创建新的线程去处理,线程池的数量最多只能是maxPooolSize的数量,而在等待了超设置的存活时间后依旧没有新的任务处理,非核心线程就会被销毁,maxPooolSize可以是非常大的数值,可以设置为Integer.MAX_VALUE
    3、keepAliveTime
    保持存活时间,当非核心线程空闲时间超过了keepAliveTime,就会被销毁
    4、workQueue
    任务存储队列,当线程池没有空闲的核心线程时,将缓存提交的任务到队列中排队等待核心线程的处理,当队列超过设置的值时,线程池将会创建新的线程去处理队列的任务,如果线程池中的线程数量达了maxPooolSize,队列也达到了设置值,将会执行拒绝策略
    小知识:当队列满的时候才会去创建多余corePoolSize的线程去处理任务,如果这是个无限队列(如linkBlockingQueue),无论队列中有多少任务,线程池都不会去创建多于corePoolSize的线程去处理任务,所以任务存储队列对线程池的影响是非常大的
    5、threadFactor
    当线程池需要新的线程时,会使用threadFactor来生成新的线程,线程工厂默认使用的是DefaultThreadFactory,通过线程工厂可以设置线程的线程组、线程名、优先级、是否是守护线程等,一般情况下使用默认的即可
    6、Handler
    线程池无法接受新提交的任务时就会使用拒绝策略
    7、unit
    线程存活时间单位

    workQueue线程池的工作队列

    队列的类型主要有以下几个:

    1、SynchronousQuene

    不缓存任务的阻塞队列,SynchronousQuene内部没有容量,当任务被提交进来后就会要求线程马上处理,不会去缓存新进来的任务,如果没有足够的核心线程处理,则会创建新线程去处理,当到达maxPooolSize时就执行拒绝策略,SynchronousQuene适用于任务数不多的场景,如果使用了SynchronousQuene应当把maxPooolSize设置得很大,这样才会避免队列执行拒绝策略

    2、LinkedBlockingQuene

    无界阻塞队列,这个队列是基于链表的队列,按照FIFO排序,使用这个队列时由于队列是无界的,所以无论有多少任务都不会排满队列容量,因此线程池不管核心线程是否空闲,maxPooolSize有多大都不会创建任何非核心线程,此队列适用于任务数量非常大的场景,但是风险就在于如果线程的处理速度跟不上任务的生产速度,任务缓存数量过多造成OOM,所以使用此队列尽量将核心线程数设置的大些

    3、ArrayBlockingQueue

    有界阻塞队列,这个队列是基于数组的队列,按照FIFO排序,这个队列的容量是可以设置的,当任务数排满队列容量时线程池就会创建新的线程去处理任务,当队列排满,线程池的线程数量也到达maxPooolSize后则会执行拒绝策略

    4、PriorityBlockingQueue

    具有优先级的无界阻塞队列,队列可以按照优先级进行内部元素排序,队列的任务需要实现Comparable 接口,才能通过使用compareTo()方法进行排序。

    5、DelayQueue

    是一个具有延迟时效的无界队列,只有在延迟期满后才能从队列中取出任务

    拒绝策略

    当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。这是线程池默认的拒绝策略
    ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务,例如如果是在main方法中提交的,就由main方法去执行这个任务

    JDK已被设定好的线程池类型:

    在我们平时开发中我们总喜欢盲目使用类似newFixedThreadPool(),newCachedThreadPool(),无脑使用它们真的好吗,我分析分析几种比较常见的线程池类型:
    1、newFixedThreadPool()
    不多BB直接看源码,看看这个方法的构造方法的

    public static ExecutorService newFixedThreadPool(int nThreads) {
     return new ThreadPoolExecutor(
        nThreads//核心线程数,
        nThreads//最大线程数,
        0L//存活时间,
        TimeUnit.MILLISECONDS//存活时间单位,                        
        new LinkedBlockingQueue<Runnable>()//无界队列
    );
        }
    
    
      public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    
    
     public ThreadPoolExecutor(int corePoolSize//核心线程数,
                                  int maximumPoolSize//最大线程数,
                                  long keepAliveTime//存活时间,
                                  TimeUnit unit//存活时间单位,
                                  BlockingQueue<Runnable> workQueue//队列) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    

    我发现这个newFixedThreadPool()是一个使用的阻塞队列是无界的,也就是说newFixedThreadPool()可以接受无限多的任务,所以永远不会执行拒绝策略,但是无论接受了多少任务,线程池都不会创建非核心线程,所以这个线程的缺点就是在并发量高,任务处理速度慢的情况下可能队列数太多造成OOM。核心线程池等于最大线程池,当前的线程数能够比较稳定保证一个数。能够避免频繁回收线程和创建线程。故适用于处理cpu密集型的任务,确保cpu在长期被工作线程使用的情况下,尽可能少的分配线程,即适用长期的任务。
    2、newCachedThreadPool()
    不多BB直接看源码,依旧是看构造函数

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
     public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    

    我们看到newCachedThreadPool()使用的是SynchronousQueue队列,这是一个不会缓存任务的队列,并且设置了核心线程数为0,最大线程数可以看做是无限大,所以永远不会执行拒绝策略,线程的存活时间是60秒,我们可以知道这个类型的线程池每接受一个线程就会马上创建新的线程去处理任务,并且会清理掉线程池中空闲超过60秒的线程,这个线程池由于没有排队阻塞所以处理任务的速度会较其他的线程池快,但是在并发量很大的时候无节制的创建和销毁线程会造成系统很大的资源开销。这个线程池是适用于并发不固定的短期小任务
    3、newSingleThreadExecutor()
    不多BB直接看源码,依旧是看构造函数

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    我们看到newSingleThreadExecutor使用的是无界队列,所以永远不会执行拒绝策略,但是核心线程数和最大线程数只是1而已,所以newSingleThreadExecutor无论接收多少任务都会只会用一个线程去处理任务,任务只会一个一个的被处理,相当于串行处理,适用串行化任务

    4、newScheduledThreadPool()
    这个的源码比较麻烦,我通过手写一个事例说明:

    public class TestTask implements Runnable {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName());
        }
    }
      public static void main(String[] args) {
            //核心线程数设置为3
            ScheduledExecutorService scp = Executors.newScheduledThreadPool(3);
            //延迟5秒
             scp.schedule(new TestTask(),5, TimeUnit.SECONDS);
            //延迟5秒,每两秒执行一次
            scp.scheduleAtFixedRate(new TestTask(),5,2,TimeUnit.SECONDS);
        }
    

    这四个比较典型的线程池是已经被JDK设置好了的,如果没有学习过线程池基础知识的程序员可能只会无脑使用它们,但是通过分析它不一定适合我们的业务,实际的使用还是要看我们的业务具体设置线程池的参数值,所以学会自定义线程池是很有必要的
    自定义线程池:

         ThreadPoolExecutor threadPoolExecutor =
                    new ThreadPoolExecutor(3, 
                            5, 
                            30, TimeUnit.SECONDS, 
                            new ArrayBlockingQueue<Runnable>(20));
    

    线程池的基本使用

    不多BB直接上代码

        private static ExecutorService fx = Executors.newFixedThreadPool(3);
        class PooTaskReturn implements Callable<String>{
            @Override
            public String call() throws Exception {
                return "有返回值的线程池方法";
            }
        }
        class PoolTask implements Runnable{
            @Override
            public void run() {
                System.out.println("无返回值的线程池方法");
            }
        }
        @Test
        public  void test(){
            for (int i=0;i<1000;i++) {
                fx.execute(new PoolTask());
                fx.execute(()->{
                    System.out.println("使用匿名内部类");
                });
            }
            List<Future<String>> fls=new ArrayList<>();
            for (int i=0;i<1000;i++) {
                Future<String> submit = fx.submit(new PooTaskReturn());
                Future<String> lambda=fx.submit(()->{
                    return "使用匿名内部类";
                });
               fls.add(submit);
               fls.add(lambda);
            }
    
            fls.forEach(l-> { try {
                    System.out.println(l.get());
                }catch (Exception e) {
                    System.out.println(e);
                }
                    });
            //关闭线程池,使用shutdown关闭时会调用拒绝策略拒绝所有新的任务进入队列,
            // 然后等待所有的任务,包括队列中的任务执行完成
            fx.shutdown();
            //关闭线程池,使用shutdown关闭时会调用拒绝策略拒绝所有新的任务进入队列,
            // 然后把尝试关闭正在执行的任务
            //将队列中的任务直接丢弃
            fx.shutdownNow();
            //所有的任务已经完成,返回true
            fx.isTerminated();
            //收到关闭指令后返回true,即执行了shutdownNow或者shutdown
            fx.isShutdown();
        }
    

    我看看到几个关闭的命令,我们把它列出来:
    shutdown():关闭线程池,使用shutdown关闭时会调用拒绝策略拒绝所有新的任务进入队列,然后等待所有的任务,包括队列中的任务执行完成
    shutdownNow():关闭线程池,使用shutdown关闭时会调用拒绝策略拒绝所有新的任务进入队列,然后把尝试关闭正在执行的任务,将队列中的任务直接丢弃
    isTerminated():所有的任务已经完成,返回true
    isShutdown():收到关闭指令后返回true,即执行了shutdownNow或者shutdown

    CPU 密集型(I/O bound)

    CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。

    CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些。

    CPU 使用率较高(例如:计算一些复杂的运算,逻辑处理等情况)非常多的情况下,线程数一般只需要设置为CPU核心数的线程个数就可以了。 这一类型多出现在开发中的一些业务复杂计算和逻辑处理过程中。

    I/O 密集型(I/O bound)

    IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。

    CPU 使用率较低,程序中会存在大量的 I/O 操作占用时间,导致线程空余时间很多,所以通常就需要开CPU核心数两倍的线程。当线程进行 I/O 操作 CPU 空闲时,启用其他线程继续使用 CPU,以提高 CPU 的使用率。例如:数据库交互,文件上传下载,网络传输等。

    线程等待时间所占比例越高,需要越多线程,启用其他线程继续使用CPU,以此提高CPU的利用率;线程 CPU 时间所占比例越高,需要越少的线程,这一类型在开发中主要出现在一些计算业务频繁的逻辑中。

    方法一:

    由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如CPU核数*2

    方法二:

    IO密集型,即该任务需要大量的IO,即大量的阻塞。在单线程上运IO密集型的任务会导致浪费大量的CPU运算能力浪费在等待。所以在IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。
    IO密集型时,大部分线程都阻塞,故需要多配置线程数:
    参考公式:CPU核数 /(1 - 阻系数)
    比如8核CPU:8/(1 - 0.9)=80个线程数
    阻塞系数在0.8~0.9之间

    总结:

    1.一个计算为主的程序(CPU密集型程序),多线程跑的时候,可以充分利用起所有的 CPU 核心数,比如说 8 个核心的CPU ,开8 个线程的时候,可以同时跑 8 个线程的运算任务,此时是最大效率。但是如果线程远远超出 CPU 核心数量,反而会使得任务效率下降,因为频繁的切换线程也是要消耗时间的。因此对于 CPU 密集型的任务来说,线程数等于 CPU 数是最好的了。

    2.如果是一个磁盘或网络为主的程序(IO密集型程序),一个线程处在 IO 等待的时候,另一个线程还可以在 CPU 里面跑,有时候 CPU 闲着没事干,所有的线程都在等着 IO,这时候他们就是同时的了,而单线程的话此时还是在一个一个等待的。我们都知道 IO 的速度比起 CPU 来是很慢的。此时线程数等于CPU核心数的两倍是最佳的。

    核心线程数计算公式

    IO密集型:核心线程数 = CPU核数 / (1-阻塞系数)

    CPU密集型:核心线程数 = CPU核数 + 1

    IO密集型:核心线程数 = CPU核数 * 2

    相关文章

      网友评论

          本文标题:教你如何多人反复共用一桶洗澡水:线程池

          本文链接:https://www.haomeiwen.com/subject/zugenhtx.html