美文网首页Java并发
ThreadPoolExecutor线程池源码解读keepAli

ThreadPoolExecutor线程池源码解读keepAli

作者: blogforum | 来源:发表于2019-08-16 13:05 被阅读0次

    keepAliveTime参数主要是把除了核心线程 其他线程当超过等待时间keepAliveTime 就会进行线程移除
    接下来主要讲下线程是如何创建 执行 然后操作等待时间是如何移除的 先介绍两个线程池的核心参数

    1. HashSet<Worker> workers工作类集合

    Worker这个其实就是保存在线程池里的线程 实现了Runnable接口 当线程池中开启10个线程 就会创建10个Worker类 这些worker会保存在workers工作类集合中 Worker类里面主要有一下两个核心参数

        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /** 执行Worker的线程 */
            final Thread thread;
            /** 这里存放的是使用方传给线程池的线程 */
            Runnable firstTask;
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                // 把自己本身worker放入到thread中 后面直接执行thread.start 就会执行Worker的run方法 然后在run方法中执行firstTask.run
                this.thread = getThreadFactory().newThread(this);
            }
    
    1. BlockingQueue< Runnable > workQueue 等待队列

    这个其实就是当线程达到核心线程数以后 会把待执行的队列放入到等待队列中等待执行

    execute(Runnable command)

    当第一次执行线程池的execute 会因为当前线程数小于核心线程数而执行这段代码 当然核心线程数设置为0除外

            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
    

    这里核心是addWorker方法 这里会把传进来当需要执行的线程command放入到刚刚说的worker工作类中 然后再放入到workers工作类集合中 然后启动worker线程

              //只拉取核心代码
              // firstTask为execute(Runnable command)传入的需要执行的线程
               w = new Worker(firstTask);
               final Thread t = w.thread;
               workers.add(w);
               t.start();
    

    之后看下Worker的run方法 里面会先执行runWorker方法 然后在方法中执行需要执行的线程
    这里核心主要代码有两句

    //task为前面execute(Runnable command)传入的需要执行的线程 如果不为空则执行执行task.run 如果为空 则通过getTask() 获取等待队列workQueue里需要执行的线程
    while (task != null || (task = getTask()) != null)
    

    重点 keepAliveTime的体现核心就是getTask()方法中

    当前执行的线程是核心线程 则timed=false

        //只拉取核心代码
         Runnable r = timed ?
         workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
         workQueue.take();
    

    就会执行workQueue.take()从等待队列里获取一个需要执行的线程任务 因为 BlockingQueue< Runnable > workQueue是阻塞队列 如果里面没有需要执行的线程任务 则会一直阻塞等待 一直到获取线程任务

           workQueue.take();
    

    重点 如果timed=true 执行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : keepAliveTime就是在这里发挥作用的

    poll这行代码就是会等待keepAliveTime设置的时间 超过超时时间就返回null

        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    

    当超过超时时间 getTask就会返回null

    while (task != null || (task = getTask()) != null)
    

    推出循环 并且设置

    completedAbruptly = false;
    

    然后执行processWorkerExit方法

    processWorkerExit(w, completedAbruptly)
    

    processWorkerExit这里核心就是把当前执行的worker从worker工作类集合中删除

    workers.remove(w);
    

    然后还有一段逻辑

             if (!completedAbruptly) {
             //allowCoreThreadTimeOut默认为false  如果设置为true 则代表线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭  上面说的timed 如果allowCoreThreadTimeOut为true 则timed也会为true 也就会进入poll方法 通过这个来实现超时关闭核心线程数
                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                 if (min == 0 && ! workQueue.isEmpty())
                     min = 1;
                  //当前线程数大于核心线程数 直接返回 然后线程关闭
                 if (workerCountOf(c) >= min)
                     return; // replacement not needed
             }
             addWorker(null, false);
    

    注意上面的 addWorker(null, false); 这个其实是个注意点优化点 当执行一开始自己写的Runnable command 的run方法如果执行异常 也会走上面的代码completedAbruptly=true 然后执行addWorker(null, false); 创建一个空的worker线程继续执行

    重点:所以run里最好是把所有的异常捕获 而不要抛出 不然抛出异常 就会立刻销毁线程 然后创建一个新线程 线程池的作用就没有

    相关文章

      网友评论

        本文标题:ThreadPoolExecutor线程池源码解读keepAli

        本文链接:https://www.haomeiwen.com/subject/odvfsctx.html