美文网首页
ThreadPoolExecutor 源码浅析(二)

ThreadPoolExecutor 源码浅析(二)

作者: WJL3333 | 来源:发表于2018-03-11 00:12 被阅读17次

    这篇首先增加了几个遗漏的方法

    之后整理了阅读源码中遇到的几个问题.

    1. shutdown方法
    public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                advanceRunState(SHUTDOWN);  //更新ctl的状态
                interruptIdleWorkers();     //中断空闲进程
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            tryTerminate(); 
        }
    
    1. awaitTermination
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (;;) {
                    if (runStateAtLeast(ctl.get(), TERMINATED))
                        return true;
                    if (nanos <= 0)
                        return false;
                    // termination是mainlock上的一个条件.
                    /// 整个方法只是等待在该条件上之后返回
                    nanos = termination.awaitNanos(nanos);
                }
            } finally {
                mainLock.unlock();
            }
        }
    
    1. setCorePoolSize
    //该方法可以在运行过程中动态调整corePoolSize
    //改变后如果线程数大于core则尝试中断空闲进程
    //如果coresize比原来大则尝试增加coreWorker的数目
    //如果增量大于queue中的任务数量的话则只保证队列中的任务都有Worker来运行.
    public void setCorePoolSize(int corePoolSize) {
            if (corePoolSize < 0)
                throw new IllegalArgumentException();
            int delta = corePoolSize - this.corePoolSize;
            this.corePoolSize = corePoolSize;
            if (workerCountOf(ctl.get()) > corePoolSize)
                interruptIdleWorkers();
            else if (delta > 0) {
                // We don't really know how many new threads are "needed".
                // As a heuristic, prestart enough new workers (up to new
                // core size) to handle the current number of tasks in
                // queue, but stop if queue becomes empty while doing so.
                int k = Math.min(delta, workQueue.size());
                while (k-- > 0 && addWorker(null, true)) {
                    if (workQueue.isEmpty())
                        break;
                }
            }
        }
    

    问题

    1. 该类是怎样保证线程安全的
    2. tryTerminate方法的作用
    3. 在什么情况下会添加线程,任务是怎样从工作队列中取出的
    4. 工作线程如果被中断或者出现异常是怎样处理的.
    5. 超时时间是怎样体现的.超时之后工作线程是怎样处理的.
    6. iterruptIdleThread方法的作用
    7. mainlock作用

    1.该类是怎样保证线程安全的

    从暴露的api来看(public方法),

    • get,set方法:

      • volatile变量的getset方法只是简单返回了变量本身,而且这些变量只是设定的数值,并不真正涉及线程池的状态.所以是线程安全的.

      • 状态统计方法getTaskCount(),可以看到这个类使用mainlock,对每个Worker进行状态统计.因为workers这个HashSet不是线程安全的所以使用了mainlock保证这个集合在迭代过程中不会改变.

    public long getTaskCount() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                long n = completedTaskCount;
                for (Worker w : workers) {
                    n += w.completedTasks;
                    if (w.isLocked())
                        ++n;
                }
                return n + workQueue.size();
            } finally {
                mainLock.unlock();
            }
        }
    
    • 除去get,set方法之外,会在被多个线程同时调用的有这几个方法

      • execute(), 这个方法主要调用了addWorker()方法, (对线程池运行状态的访问是CAS操作)
        addWorker() 创建线程添加到workers (HashSet)使用mainlock保证了对workers状态 更改是线程安全的.
    • remove(Runnable r) -> 委托给BlockingQueue.remove
    • shutdown(),shutdownNow(),awaitTermination() -> 使用mainlock

    2. tryTerminate()方法的作用

    代码中调用该方法的方法有

    • addWorkerFailed
    • processWorkerExit
    • shutdown ,shutdownNow

    这个方法只有在状态为( SHUTDOWN, STOP, TIDYING) 时才会生效.
    在没有工作线程运行时会保证terminated只被调用一次.
    否则会中断没有停止的线程.

    shutdown,shutdownNow调用前会中断所有空闲线程,调用该方法的目的是调用terminated方法

    processWorkerExitaddWorkerFailed方法则是为了线程退出或者创建线程后把这种线程中断掉.

    如果不中断掉的话,shutdown则无法退出.

    3.在什么情况下会添加线程,任务是怎样从工作队列中取出的

    • getTask() 从工作队列中取出
    • 添加线程
      • processWorkerExit线程退出后如果线程数少于core,会调用addWorker
      • execute新任务提交时, 少于 core 创建线程,工作队列满
      • 动态调整 core 时,如果core变大,调用addWorker

    4.工作线程如果被中断或者出现异常是怎样处理的.

    runWorker的逻辑类似于下面的代码

    final Runnable task2 = () ->{
        if(Thread.currentThread().isInterrupted()){
            System.out.println("iterrupted");
            return;
        }
        System.out.println(2);
    
    };
    Runnable task = () ->{
        Exception exp = null;
        try {
            Thread.currentThread().interrupt();
            System.out.println(1);
            task2.run();
        }catch (Exception e) {
            exp = e; throw e;
        }finally {
            if(exp!=null){
                exp.printStackTrace();
            }
        }
    };
    
    Thread t = new Thread(task);
    t.start();
    

    task2相当于传入线程池的Runnable任务.

    task相当于runWorker方法.

    逻辑为如果线程池处于即将停止状态Worker中的线程就会被中断.

    如果task2(传入的Runnable任务)中没有处理中断的逻辑的话,该线程还是不会退出.

    出现异常的话会被task2.run()外部的catch块接受到异常,之后异常会传递到afterExecute回调中

    而且出现异常的话该线程会被processWorkerExit处理并从workers中移除.

    5.超时时间是怎样体现的.超时之后工作线程是怎样处理的.

    超时体现在从工作线程中取得任务getTask方法,如果从workQueue中限时取得任务超时,则返回null

    这样runWorker方法中的while方法会退出,之后执行processWorkerExit逻辑,移除工作线程

    6.iterruptIdleThread方法的作用

    这个方法会中断所有线程(参数为false时),用于在shutdown中减少空闲线程数目.

    在动态调整线程池时也会调用该方法allowCoreThreadTimeOut,setMaximumPoolSize

    7.mainlock作用

    保证外界改变线程池状态时的线程安全.

    tryTerminate中确保terminated回调只调用一次

    访问workers``HashSet确保线程安全

    相关文章

      网友评论

          本文标题:ThreadPoolExecutor 源码浅析(二)

          本文链接:https://www.haomeiwen.com/subject/hklifftx.html