线程池

作者: 暮雨沉沦 | 来源:发表于2020-09-19 13:53 被阅读0次

    一、简介

    一种管理线程的技术,可以缓存线程

    (1)复用已经存在的线程,减少线程的创建和销毁

    (2)提高响应时间,已有线程可以直接执行任务,无需等待线程创建

    (3)方便管理线程数量,防止过多的线程占用太多内存,同时能防止过多的线程切换,提高CPU效率

    二、构造函数

    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

    int corePoolSize: 核心线程数,在不设置 allowCoreThreadTimeOut 为 true 的情况下,核心线程就算没事做也不会被销毁。

    int maximumPoolSize : 最大线程数

    long keepAliveTime:超时时长,一个非核心线程(设置 allowCoreThreadTimeOut 为 true 也同样作用于核心线程)在处于闲置状态(没事做)超过这个时长就 会被销毁。

    TimeUnit unit:时间单位,秒、毫秒、微秒等

    BlockingQueue<Runnable> workQueue: 阻塞队列

    ThreadFactory threadFactory:线程池工厂,可以自己实现,方便设置线程的名字以及优先级

    RejectedExecutionHandler handler:拒绝策略

    image.png

    三、常见的线程池

    CachedThreadPool

    可缓存线程池

    Executors.newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
    

    1. 内部没有核心线程,线程的数量是有没限制的

    2. 在创建任务时,若有空闲的线程时则复用空闲的线程,若没有则新建线程。

    3. 没有工作的线程(闲置状态)在超过了60S没有任务,就会销毁。

    FixedThreadPool

    定长线程池

    Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>(),
                                     threadFactory);
    }
    

    1. 最大线程数等于核心线程数,所以在默认情况下,该线程池的线程不会因为闲置状态超时而被销毁。

    2. 如果当前线程数小于核心线程数,并且也有闲置线程的时候提交了任务,这时也不会去复用之前的闲置线程,会创建新的线程去执行任务。如果当前执行任务数大于了核心线程数,大于的部分就会进入队列等待。等着有闲置的线程来执行这个任务。LinkedBlockingQueue默认大小是Integer.MAX_VALUE可能导致内存占用过多

    ScheduledThreadPool

    定时任务线程池

    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }
    

    1. 设置了核心线程数,最大线程数也是 Integer.MAX_VALUE。

    2. 这个线程池是上述4个中为唯一个有延迟执行和周期执行任务的线程池。

    SingleThreadExecutor

    单线程池

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
    

    1、只有一个线程且不回收

    2、LinkedBlockingQueue默认大小是Integer.MAX_VALUE可能导致内存占用过多

    四、阻塞队列

    常用的几种队列

    (1)ArrayBlockingQueue:基于数组实现,固定大小,其构造必须指定大小。其所含的对象是FIFO顺序排序的。

    (2)LinkedBlockingQueue:基于链表实现,默认大小Integer.MAX_VALUE,可传入。其所含的对象是FIFO顺序排序的。

    (3)PriorityBlockingQueue:类似于LinkedBlockingQueue,但是其所含对象的排序不是FIFO,而是依据对象的自然顺序或者构造函数的Comparator决定。无界队列

    (4)SynchronousQueue:不存储元素的阻塞队列,大小为0,每一个put都必须等待take操作,提供任务的转发。

    (5) DelayQueue:基于PriorityQueue, 支持延时获取元素, 无界

    常用方法

    抛异常 返回特殊值 一直阻塞 超时退出
    插入数据 add(e) offer(e) put(e) offer(e, time, unit)
    移除数据 remove(e) poll() take() poll(time, unit)

    take:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    

    五、拒绝策略

    1、拒绝时机

    • 线程池没有能力处理新任务:最大线程数大于且队列数量到达了最大值

    • 线程池被调用了shutdown关闭之后,还没有完全停止的时候。

    2、策略

    • AbortPolicy:默认的策略,直接抛出RejectedExecutionException ,RuntimeException

    • DiscardPolicy:默默丢弃任务,不进行任何通知。实际上内部啥也没干

    • DiscardOldestPolicy:丢弃掉在队列中存在时间最久的任务,然后重新提交任务

    • CallerRunsPolicy:让提交任务的线程去执行任务,不会丢失业务数据,阻塞了提交任务的线程,减缓提交任务的速度

    3、拒绝策略使用

    关键业务要自定义拒绝策略,可以记录日志,也可以把拒绝结果返回给用户

    六、合理使用线程池

    java 开发手册<并发处理>中片段:

    [图片上传失败...(image-cdc18e-1600494356170)]

    1、自定义线程池时,使用自定义ThreadFactory,指定线程名,方便排查问题

    2、分析业务数量级,如果会很大,必须自定义线程池,限定最大线程池,使用有界队列,防止jvm OOM

    3、即使限制了最大线程数,过多的线程会导致线程切换频繁,cpu效率地下,也要考虑为别的业务让出cpu

    4、核心线程数,看任务,如果时一天执行一次,设置为0,跑完释放内存。如果常用,看任务量

    5、拒绝策略要重写,默认的策略AbortPolicy会导致crash

    七、线程池的复用原理

    1、回顾线程创建

    new Thread(new Runnable() {
        @Override
        public void run() {
            
        }
    }).start();
    

    创建一个线程,调用start()启动线程,线程执行完run方法,就进入Dead状态。start()方法只能调用一次

    2、线程池提交任务

    ExecutorService executorService = Executors.newFixedThreadPool(2);
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            
        }
    });
    

    3、复用原理

    复用线程的本质:提交任务,在任务内死循环执行提交下来的任务

    public void run() {
      while (true) {
        if(tasks available) {
          Runnable task = taskqueue.dequeue();
          task.run();
        } else{
          // wait or shutdown
        }
      }
    }
    

    线程创建执行任务流程:

    image.png

    (1) execute提交任务

    判断当前工作线程数是否小于核心线程数,需要创建新线程则进入addWorker

    (2) addWorker 创建线程

      private final class Worker implements Runnable{
          final Thread thread;
          Runnable firstTask;
          Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            // 把 Worker 作为 thread 运行的任务
            this.thread = getThreadFactory().newThread(this);
        }
          public void run() {
              runWorker(this);
          }
      }
    

    (3)创建Worker

    Worker以提交下来的任务作为构造参数,并创建一个新的线程

    w = new Worker(firstTask);
    final Thread t = w.thread;
    workers.add(w);
    

    (4)runWorker

    调用Worker内部变量thread的start()方法启动线程

    final Thread t = worker.thread;
     t.start();
    

    线程启动后,调用到Worker内部的run方法

      public void run() {
              runWorker(this);
          }
    

    (5)循环执行任务

    final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        while (task != null || (task = getTask()) != null) {
            try {
                task.run();
            } finally {
                task = null;
            }
        }
    }
    
    Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
        workQueue.take()
    

    八、关闭线程池

    shutdown() : 调用之后,线程池不是立刻就关闭,等待任务队列中的任务执行完才彻底关闭。同时,还会根据拒绝策略拒绝新提交的任务

    isShutdown(): 返回线程池是否已经开始了关闭工作,返回不代表已经彻底关闭。

    isTerminated(): 检测线程池是否真正的关闭了

    shutdownNow(): 强制关闭线程池,给所有线程发送interrupt中断信号,然后所有的任务转移到另外一个List中。方便调用这做一些补救操作。

    参考文档

    Java 阻塞队列--BlockingQueue: https://www.cnblogs.com/bjxq-cs88/p/9759571.html

    Java 线程池中的线程复用是如何实现:https://www.jianshu.com/p/f84a61917b03

    Tomcat 高并发之道原理拆解与性能调优:https://mp.weixin.qq.com/s/4_En4hHeolYyO0RcliGxCQ

    Java线程池的拒绝策略:https://www.cnblogs.com/eric-fang/p/11584142.html

    相关文章

      网友评论

          本文标题:线程池

          本文链接:https://www.haomeiwen.com/subject/mruwsktx.html