美文网首页
Java 并发编程基础系列 (三) 线程池 2 ——Thre

Java 并发编程基础系列 (三) 线程池 2 ——Thre

作者: Gxgeek | 来源:发表于2017-12-22 18:06 被阅读0次

    ThreadPoolExecutor

    刚开始用的 时候 一直使用 Executors
    但是 由于 idea 装了 阿里代码检查插件 说明 这样是不合规范的 那么 我们就来彻底了解一下 ThreadPoolExecutor 的奥秘。

    Executors各个方法的弊端:
    1)newFixedThreadPool和newSingleThreadExecutor:
    主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
    2)newCachedThreadPool和newScheduledThreadPool:
    主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
    

    构造函数

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    • corePoolSize:线程池的核心线程数(线程池的基本大小),线程池中运行的线程数也永远不会超过 corePoolSize 个,默认情况下可以一直存活。可以通过设置allowCoreThreadTimeOut为True,此时 核心线程数就是0,此时keepAliveTime控制所有线程的超时时间。如果调用了prestartCoreThread() 会提前创建 核心线程

    • maximumPoolSize:线程池中允许的最大线程数。线程池的阻塞队列满了之后,如果还有任务提交,如果当前的线程数小于maximumPoolSize,则会新建线程来执行任务。注意,如果使用的是无界队列,该参数也就没有什么效果了;

    • keepAliveTime: 线程空闲的时间。线程的创建和销毁是需要代价的。线程执行完任务后不会立即销毁,而是继续存活一段时间:keepAliveTime。默认情况下,该参数只有在线程数大于corePoolSize时才会生效;

    • unit :是一个枚举,表示 keepAliveTime 的单位;

    • workQueue:表示存放任务的BlockingQueue<Runnable队列。

      • ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。
      • LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。
      • SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作,反之亦然。
      • PriorityBlockingQueue:具有优先界别的阻塞队列。

      阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。具体的实现类有LinkedBlockingQueue,ArrayBlockingQueued等。一般其内部的都是通过Lock和Condition(显示锁(Lock)及Condition的学习与使用)来实现阻塞和唤醒。

    • threadFactory
      用于设置创建线程的工厂。该对象可以通过Executors.defaultThreadFactory(),也可以 通过 guava提供的ThreadFactoryBuilder 快速的的给 线程池设置有意义的名字 如下:
      JDK 自带的 DefaultFactory如下
    Executors.defaultThreadFactory()
    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    
    

    使用默认线程工厂 创建就是 默认优先级为Thread.NORM_PRIORITY ,非守护线程

    线程命名

    这点得反复强调。对正在运行的JVM进行线程转储(thread dump)或者调试时,线程池默认的命名机制是pool-N-thread-M,这里N是线程池的序号(每新创建一个线程池,这个N都会加一),而M是池 里线程的序号。比方说,pool-2-thread-3指的是JVM生命周期中第二个线程池里的第三个线程。

    import com.google.common.util.concurrent.ThreadFactoryBuilder;
     
    final ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("Orders-%d")
            .setDaemon(true)
            .build();
    final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);
    
    
    • handler

    RejectedExecutionHandler,线程池的拒绝策略。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。当向线程池中提交任务时,如果此时线程池中的线程已经饱和了,而且阻塞队列也已经满了,则线程池会选择一种拒绝策略来处理该任务。
    线程池提供了四种拒绝策略:

    - AbortPolicy:直接抛出异常,默认策略;
    - CallerRunsPolicy:用调用者所在的线程来执行任务;
    - DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    - DiscardPolicy:直接丢弃任务;
    当然我们也可以实现自己的拒绝策略,例如记录日志等等,实现RejectedExecutionHandler接口即可。
    

    继续向下,再看execute() 方法之前 我们先看下 线程池的 字段

        //ctl是一个原子整数包装
        //两个概念领域
        // workerCount,表示有效的线程数
        // runState,指示是否运行,关闭等
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
        
        /* runState提供了主要的生命周期控制,取值为:
        *
        *RUNNING:接受新的任务和处理排队的任务
        *SHUTDOWN:不接受新的任务,但处理排队的任务
        *STOP:不接受新的任务,不处理排队的任务,并中断正在进行的任务
        *TIDYING:所有任务已经终止,workerCount为零,线程转换到状态TIDYING,将运行terminate()钩子方法
        * TERMINATED:已终止()已完成
        */
        // runState is stored in the high-order bits
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    

    变量 ctl 定义为AtomicInteger,其功能非常强大,记录了“线程池中的任务数量”和“线程池的状态”两个信息。共32位,其中高3位表示"线程池状态",低29位表示"线程池中的任务数量"。

    线程状态改变图
    RUNNING            -- 对应的高3位值是111。
    SHUTDOWN       -- 对应的高3位值是000。
    STOP                   -- 对应的高3位值是001。
    TIDYING              -- 对应的高3位值是010。
    TERMINATED     -- 对应的高3位值是011。
    

    再看最为核心的方法吧:

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            //1.start
            int c = ctl.get();
            // 当前线程数 < corePoolSize
            if (workerCountOf(c) < corePoolSize) {
                // 直接启动新的线程。
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }//1.end
            
            //2 .start 
            if (isRunning(c) && workQueue.offer(command)) {//2.end
                int recheck = ctl.get();
                //2.1 start
                if (! isRunning(recheck) && remove(command))//2.1 end
                    reject(command);
                //2.2start
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);//2,2 end
            }
            //3.start
            else if (!addWorker(command, false))
                reject(command);
            //3.end
        }
    
    
    • 1.如果线程池当前线程数小于corePoolSize,则调用addWorker创建新线程执行任务,成功返回true,失败执行步骤2。
    • 2.如果线程池处于RUNNING状态,则尝试加入阻塞队列,如果加入阻塞队列成功,当前正在执行,且往工作队列中添加成功,就再次获取当前工作线程数
      • , 我们还需要再次检查一下此时线程池是否还是 RUNNING 状态, 如果不是的话就会将原来插入队列中的那个任务删除, 然后调用 reject 方法拒绝此任务的提交;//****执行了shutdown 之类的代码***
      • 接着考虑到在我们插入任务到 workQueue 中的同时, 如果此时线程池中的线程都执行完毕并终止了,(没有线程了) 在这样的情况下刚刚插入到 workQueue 中的任务就永远不会得到执行了. 为了避免这样的情况, 因此我们由再次检查一下线程池中的线程数, 如果为零, 则调用 addWorker(null, false) 来添加一个线程.
    • 3.当workQueue.offer失败时,也就是说现在队列已满,不能再向队列里放,此时工作线程大于等于corePoolSize,创建新的线程执行该task; 加入失败表示 最大线程也满了 ,实施拒绝策略
        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                //
                int c = ctl.get();
                //获取当前线程状态
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                // 1.start
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))//1.end
                    return false;
    
                for (;;) {
                //线程数量
                    int wc = workerCountOf(c);
                    //2. start
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    //2.end
                        return false;
                    //3.
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    //4.
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
            
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                //新建线程
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    //获取主锁:mainLock
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        //5 .strat
                        // 线程状态
                        int rs = runStateOf(ctl.get());
                        // rs < SHUTDOWN ==> 线程处于RUNNING状态
                        // 或者线程处于SHUTDOWN状态,且firstTask == null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            // 当前线程已经启动,抛出异常
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            // workers是一个HashSet<Worker>
                            workers.add(w);
            // 设置最大的池大小largestPoolSize,workerAdded设置为true
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        // 释放锁
                        mainLock.unlock();
                        //5.end
                    }  
                    // 启动线程
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                // 线程启动失败
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    
    • 1.判断是否可以执行任务
      • rs >= SHUTDOWN ,表示当前线程处于SHUTDOWN ,STOP、TIDYING、TERMINATED状态
      • rs == SHUTDOWN , firstTask != null时不允许添加线程,因为线程处于SHUTDOWN 状态,不允许添加任务
      • rs == SHUTDOWN , firstTask == null,但workQueue.isEmpty() == true,不允许添加线程,因为firstTask == null是为了添加一个没有任务的线程然后再从workQueue中获取任务的,如果workQueue == null,则说明添加的任务没有任何意义。
    • 2.wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)
      • 如果当前线程数大于线程最大上限CAPACITY return false
      • 若core == true,则与corePoolSize 比较,否则与maximumPoolSize ,大于 return false
    • 3 成功CAS workCount 数量 跳出retry循环
    • 4重新读一遍工作状态,如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断
    • 5 获取线程状态 ,如果线程状态是 Runing 的或者SHUTDOWN时 但是firstTask 为空 workQueue 可能不为空
    addWorker的4种调用方式(Runnable firstTask, boolean core)

    firstTask 是否是新的任务 core 大小限制是用corecount 还是 maxcount

    • addWorker(command, true)
      • execute(Runnable ) 时调用
      • 线程数 < coreSize时,将task放入workers,如果线程数 >=
        coreSize(并发),返回false;
    • addWorker(command, false)
      • execute(Runnable ) 时调用
      • 当阻塞对列已满,尝试将新的task放入workers,如果线程数 >= maximumPoolSize,返回false;
    • addWorker(null, false)
      • execute(Runnable ) 时调用 ----- (ScheduledThreadPoolExecutor)ensurePrestart()调用 先不理 ---------(Worker run 方法最后 finally() 时候调用 )processWorkerExit()
      • 放入一个空的task到workers,此时线程数的限制是maximumPoolSize,相当于创建一个新的线程,没立马分配任务;
    • addWorker(null, true)
      放入一个空的task到workers,线程数 < coreSize。实际的使用是在prestartCoreThread()等方法,(不是ThreadPoolExecutor框架在QuantumRenderer) 有兴趣的读者可以自行阅读,在此不做详细赘述。

    在新建线程执行任务时,将讲Runnable包装成一个Worker,Woker为ThreadPoolExecutor的内部类

    Worker具体实现

    • 在addWorker中,t.start()使线程就绪,我们来看看Worker类的具体设计。

    • Worker继承AbstractQueuedSynchronizer,方便实现工作线程的中止等操作;

    • Worker实现Runnable接口,将自身作为一个task在工作线程中执行;
      addWoker中的t.start()实质上是执行Worker的run()方法:

        public void run() {
            runWorker(this);
        }
    
        final void runWorker(Worker w) {
            //1.start
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {//1.end
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    //2.start 
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        
                        wt.interrupt();//2.end
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    

    解析

    • 1.线程启动后,释放锁,设AQS状态为0;
      获取firstTask任务并执行,执行任务前后可定制beforeExecute和afterExecute; 如果worker自己的task为null,调用getTask从阻塞队列获取等待任务执行,否则,阻塞该方法。 getTask() 方法
        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                //必要情况下需要检查workQueue是否为空 和 线程池状态
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                //尝试CAS-递减workerCount
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
            //如果线程池允许线程超时或者当前线程数大于核心线程数,则会进行超时处理
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
            // 从阻塞队列中获取task(阻塞)
            // 如果需要超时控制,则调用poll(),否则调用take()
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    getTask循环实现:

     workQueue.poll:如果在keepAliveTime时间内阻塞队列有任务,返回该任务并执行;
     workQueue.take:如果阻塞队列为空,当前线程阻塞,当队列有任务时,线程被唤醒,执行take返回的任务。
    
    • 2
      • 如果 当线程池 是stoping时,才会被设置为中断
      • 否者清除中断 ,如果线程池状态 >= STOP ,且当前线程没有设置中断状态,则wt.interrupt()。
    运行流程
    • 根据worker获取要执行的任务task,然后调用unlock()方法释放锁,这里释放锁的主要目的在于中断,因为在new Worker时,设置的state为-1,调用unlock()方法可以将state设置为0,这里主要原因就在于interruptWorkers()方法只有在state >= 0时才会执行;
    • 通过getTask()获取执行的任务,调用task.run()执行,当然在执行之前会调用worker.lock()上锁,执行之后调用worker.unlock()放锁;
    • 在任务执行前后,可以根据业务场景自定义beforeExecute() 和 afterExecute()方法,则两个方法在ThreadPoolExecutor中是空实现;
    • 如果线程执行完成,则会调用getTask()方法从阻塞队列中获取新任务,如果阻塞队列为空,则根据是否超时来判断是否需要阻塞;
    • task == null或者抛出异常(beforeExecute()、task.run()、afterExecute()均有可能)导致worker线程终止,则调用processWorkerExit()方法处理worker退出流程。

    在runWorker()方法中,无论最终结果如何,都会执行processWorkerExit()方法对worker进行退出处理。

        private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //执行失败 线程数 -1 
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                //从workers 中移除 worker
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
        // 有worker线程移除,可能是最后一个线程退出需要尝试终止线程池
            tryTerminate();
            
            int c = ctl.get();
        // 如果线程为running或shutdown状态,即tryTerminate()没有成功终止线程池
            if (runStateLessThan(c, STOP)) {
            // 正常退出,计算min:需要维护的最小线程数量
                if (!completedAbruptly) {
            //    allowCoreThreadTimeOut 默认 false
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                // 如果线程数量大于最少数量min,直接返回,不需要新增线程
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                //增加线程
                addWorker(null, false);
            }
        }
    
    
    总结:

    首先completedAbruptly的值来判断是否需要对线程数-1处理,如果completedAbruptly == true,说明在任务运行过程中出现了异常,那么需要进行减1处理,否则不需要,因为减1处理在getTask()方法中处理了。然后从HashSet中移出该worker,过程需要获取mainlock。然后调用tryTerminate()方法处理,该方法是对最后一个线程退出做终止线程池动作。如果线程池没有终止,那么线程池需要保持一定数量的线程,则通过addWorker(null,false)新增一个空的线程。

    在addWorker()方法中,如果线程t==null,或者在add过程出现异常,会导致workerStarted == false,那么在最后会调用addWorkerFailed()方法:
        private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (w != null)
                // 从HashSet中移除该worker
                    workers.remove(w);
                // 线程数 - 1
                decrementWorkerCount();
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
    

    tryTerminate()

    当线程池涉及到要移除worker时候都会调用tryTerminate(),该方法主要用于判断线程池中的线程是否已经全部移除了,如果是的话则关闭线程池。

        final void tryTerminate() {
            for (;;) {
                int c = ctl.get();
                //1.start
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
                //1.end
                
                
                // 2.start
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
                //2.end
                final ReentrantLock mainLock = this.mainLock;
                
                //3.start
            /**
             * 如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated
             */
    
                mainLock.lock();
                try {
            //CAS:将线程池的ctl变成TIDYING(所有的任务被终止,workCount为0,为此状态时将会调用terminated()方法)
            //期间ctl有变化就会失败,会再次for循环
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            terminated();
                        } finally {
                            ctl.set(ctlOf(TERMINATED, 0));//将线程池的ctl变成TERMINATED
    
                            termination.signalAll();//唤醒调用了 等待线程池终止的线程 awaitTermination()
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }
    
    
        private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
    
    
      1. 如果线程处于run
      2. 或 如果 线程池已经终止了
      3. 或 线程池处于ShutDown状态,但是阻塞队列不为空
      1. 如果 线程处于STOP状态,要么处于SHUTDOWN且阻塞队列为空 如果 工作线程 不为空 中断操作.
      • 2.在interruptIdleWorkers()中断之前需要先tryLock()获取worker锁,意味着正在运行的worker不能中断,因为worker.tryLock()失败,且锁是不可重入的故shutdown()只有对能获取到worker锁的空闲线程(正在从workQueue中getTask(),此时worker没有加锁)发送中断信号
      由此可以将worker划分为:
      1、空闲worker:正在从workQueue阻塞队列中获取任务的worker
      2、运行中worker:正在task.run()执行任务的worker
      

      3.如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated会先上锁,将线程池置为tidying状态,之后调用需子类实现的 terminated(),最后线程池置为terminated状态,并唤醒所有等待线程池终止这个Condition的线程

    线程终止

    shutdown()
        /**
         *发起一个有序的关闭,在此之前提交的任务被执行,
         *但是没有新的任务被接受。如果已经关闭,调用没有额外的效果。
         * Initiates an orderly shutdown in which previously submitted
         * tasks are executed, but no new tasks will be accepted.
         * Invocation has no additional effect if already shut down.
         *
         *当前方法不会等待之前提交的任务执行结束,可以使用awaitTermination()
         * <p>This method does not wait for previously submitted tasks to
         * complete execution.  Use {@link #awaitTermination awaitTermination}
         * to do that.
         *
         * @throws SecurityException {@inheritDoc}
         */
        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //是否有权限
                checkShutdownAccess();
                //CAS+循环设置线程池状态为SHUTDOWN
                advanceRunState(SHUTDOWN);
                //中断所有空闲线程  (正在运行的不会被中断  同时 getTask()时  可以跳出阻塞)
                interruptIdleWorkers();
                //预留 子类实现 
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            //清理 线程池
            tryTerminate();
        }
    
    
    • 1、上锁,mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock

    • 2、判断调用者是否有权限shutdown线程池

    • 3、使用CAS操作将线程池状态设置为shutdown,shutdown之后将不再接收新任务

    • 4、中断所有空闲线程 interruptIdleWorkers()

    • 5、onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理

    • 6、解锁

    • 7、尝试终止线程池 tryTerminate()

    --

    shutdownNow()
        /**
         *尝试停止所有活动的正在执行的任务,停止等待任务的处理,并返回正在等待被执行的任务列表
         *这个任务列表是从任务队列中排出(删除)的
         * Attempts to stop all actively executing tasks, halts the
         * processing of waiting tasks, and returns a list of the tasks
         * that were awaiting execution. These tasks are drained (removed)
         * from the task queue upon return from this method.
         *
         *这个方法不用等到正在执行的任务结束,要等待线程池终止可使用awaitTermination()
         * <p>This method does not wait for actively executing tasks to
         * terminate.  Use {@link #awaitTermination awaitTermination} to
         * do that.
         *
         *
         *除了尽力尝试停止运行中的任务,没有任何保证
         * 取消任务是通过Thread.interrupt()实现的,
         * 所以任何响应中断失败的    任务可能永远不会结束
         * <p>There are no guarantees beyond best-effort attempts to stop
         * processing actively executing tasks.  This implementation
         * cancels tasks via {@link Thread#interrupt}, so any task that
         * fails to respond to interrupts may never terminate.
         *
         * @throws SecurityException {@inheritDoc}
         */
        public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                //CAS+循环设置线程池状态为stop
                advanceRunState(STOP);
                //中断所有线程,包括正在运行任务的
                interruptWorkers();
                 //将workQueue中的元素放入一个List并返回
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
        //尝试终止线程池
            tryTerminate();
            return tasks; //返回workQueue中未执行的任务
        }
    
    interruptWorkers() 很简单,循环对所有worker调用 interruptIfStarted(),其中会判断worker的AQS state是否大于0,即worker是否已经开始运作,再调用Thread.interrupt()
    
    需要注意的是,对于运行中的线程调用Thread.interrupt()并不能保证线程被终止,task.run()内部可能捕获了InterruptException,没有上抛,导致线程一直无法结束
    

    shutdownNow() 和 shutdown()的大体流程相似,差别是:

    • 1、将线程池更新为stop状态

    • 2、调用 interruptWorkers() 中断所有线程,包括正在运行的线程

    • 3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务

    awaitTermination() -- 等待线程池终止
        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (;;) {
                    //如果结束 返回
                    if (runStateAtLeast(ctl.get(), TERMINATED))
                        return true;
                    //超时
                    if (nanos <= 0)
                        return false;
                    nanos = termination.awaitNanos(nanos);
                }
            } finally {
                mainLock.unlock();
            }
        }
    
    

    在发出一个shutdown请求后,在以下3种情况发生之前,awaitTermination()都会被阻塞

    • 1、所有任务完成执行
    • 2、到达超时时间
    • 3、当前线程被中断
    /**
     * Wait condition to support awaitTermination
     */
    private final Condition termination = mainLock.newCondition();
    
    

    awaitTermination() 循环的判断线程池是否terminated终止 或 是否已经超过超时时间,然后通过termination这个Condition阻塞等待一段时间

    termination.awaitNanos() 是通过 LockSupport.parkNanos(this, nanosTimeout)实现的阻塞等待

    阻塞等待过程中发生以下具体情况会解除阻塞(对上面3种情况的解释):

    1、如果发生了 termination.signalAll()(内部实现是 LockSupport.unpark())会唤醒阻塞等待,且由于ThreadPoolExecutor只有在 tryTerminated()尝试终止线程池成功,将线程池更新为terminated状态后才会signalAll(),故awaitTermination()再次判断状态会return true退出

    2、如果达到了超时时间 termination.awaitNanos() 也会返回,此时nano==0,再次循环判断return false,等待线程池终止失败

    3、如果当前线程被 Thread.interrupt(),termination.awaitNanos()会上抛InterruptException,awaitTermination()继续上抛给调用线程,会以异常的形式解除阻塞

    故终止线程池并需要知道其是否终止可以用如下方式:

    executorService.shutdown();
    try{
        while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
            LOGGER.debug("Waiting for terminate");
        }
    } 
    catch (InterruptedException e) {
        //中断处理
    }
    
    

    参考文献

    【死磕Java并发】—–J.U.C之线程池:ThreadPoolExecutor

    Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

    《Java 并发编程的艺术》

    相关文章

      网友评论

          本文标题:Java 并发编程基础系列 (三) 线程池 2 ——Thre

          本文链接:https://www.haomeiwen.com/subject/noexgxtx.html