美文网首页
ThreadPoolExecutor 源码浅析(二)

ThreadPoolExecutor 源码浅析(二)

作者: WJL3333 | 来源:发表于2018-03-11 00:12 被阅读17次

这篇首先增加了几个遗漏的方法

之后整理了阅读源码中遇到的几个问题.

  1. shutdown方法
public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);  //更新ctl的状态
            interruptIdleWorkers();     //中断空闲进程
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate(); 
    }
  1. awaitTermination
public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                // termination是mainlock上的一个条件.
                /// 整个方法只是等待在该条件上之后返回
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }
  1. setCorePoolSize
//该方法可以在运行过程中动态调整corePoolSize
//改变后如果线程数大于core则尝试中断空闲进程
//如果coresize比原来大则尝试增加coreWorker的数目
//如果增量大于queue中的任务数量的话则只保证队列中的任务都有Worker来运行.
public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            // We don't really know how many new threads are "needed".
            // As a heuristic, prestart enough new workers (up to new
            // core size) to handle the current number of tasks in
            // queue, but stop if queue becomes empty while doing so.
            int k = Math.min(delta, workQueue.size());
            while (k-- > 0 && addWorker(null, true)) {
                if (workQueue.isEmpty())
                    break;
            }
        }
    }

问题

  1. 该类是怎样保证线程安全的
  2. tryTerminate方法的作用
  3. 在什么情况下会添加线程,任务是怎样从工作队列中取出的
  4. 工作线程如果被中断或者出现异常是怎样处理的.
  5. 超时时间是怎样体现的.超时之后工作线程是怎样处理的.
  6. iterruptIdleThread方法的作用
  7. mainlock作用

1.该类是怎样保证线程安全的

从暴露的api来看(public方法),

  • get,set方法:

    • volatile变量的getset方法只是简单返回了变量本身,而且这些变量只是设定的数值,并不真正涉及线程池的状态.所以是线程安全的.

    • 状态统计方法getTaskCount(),可以看到这个类使用mainlock,对每个Worker进行状态统计.因为workers这个HashSet不是线程安全的所以使用了mainlock保证这个集合在迭代过程中不会改变.

public long getTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers) {
                n += w.completedTasks;
                if (w.isLocked())
                    ++n;
            }
            return n + workQueue.size();
        } finally {
            mainLock.unlock();
        }
    }
  • 除去get,set方法之外,会在被多个线程同时调用的有这几个方法

    • execute(), 这个方法主要调用了addWorker()方法, (对线程池运行状态的访问是CAS操作)
      addWorker() 创建线程添加到workers (HashSet)使用mainlock保证了对workers状态 更改是线程安全的.
  • remove(Runnable r) -> 委托给BlockingQueue.remove
  • shutdown(),shutdownNow(),awaitTermination() -> 使用mainlock

2. tryTerminate()方法的作用

代码中调用该方法的方法有

  • addWorkerFailed
  • processWorkerExit
  • shutdown ,shutdownNow

这个方法只有在状态为( SHUTDOWN, STOP, TIDYING) 时才会生效.
在没有工作线程运行时会保证terminated只被调用一次.
否则会中断没有停止的线程.

shutdown,shutdownNow调用前会中断所有空闲线程,调用该方法的目的是调用terminated方法

processWorkerExitaddWorkerFailed方法则是为了线程退出或者创建线程后把这种线程中断掉.

如果不中断掉的话,shutdown则无法退出.

3.在什么情况下会添加线程,任务是怎样从工作队列中取出的

  • getTask() 从工作队列中取出
  • 添加线程
    • processWorkerExit线程退出后如果线程数少于core,会调用addWorker
    • execute新任务提交时, 少于 core 创建线程,工作队列满
    • 动态调整 core 时,如果core变大,调用addWorker

4.工作线程如果被中断或者出现异常是怎样处理的.

runWorker的逻辑类似于下面的代码

final Runnable task2 = () ->{
    if(Thread.currentThread().isInterrupted()){
        System.out.println("iterrupted");
        return;
    }
    System.out.println(2);

};
Runnable task = () ->{
    Exception exp = null;
    try {
        Thread.currentThread().interrupt();
        System.out.println(1);
        task2.run();
    }catch (Exception e) {
        exp = e; throw e;
    }finally {
        if(exp!=null){
            exp.printStackTrace();
        }
    }
};

Thread t = new Thread(task);
t.start();

task2相当于传入线程池的Runnable任务.

task相当于runWorker方法.

逻辑为如果线程池处于即将停止状态Worker中的线程就会被中断.

如果task2(传入的Runnable任务)中没有处理中断的逻辑的话,该线程还是不会退出.

出现异常的话会被task2.run()外部的catch块接受到异常,之后异常会传递到afterExecute回调中

而且出现异常的话该线程会被processWorkerExit处理并从workers中移除.

5.超时时间是怎样体现的.超时之后工作线程是怎样处理的.

超时体现在从工作线程中取得任务getTask方法,如果从workQueue中限时取得任务超时,则返回null

这样runWorker方法中的while方法会退出,之后执行processWorkerExit逻辑,移除工作线程

6.iterruptIdleThread方法的作用

这个方法会中断所有线程(参数为false时),用于在shutdown中减少空闲线程数目.

在动态调整线程池时也会调用该方法allowCoreThreadTimeOut,setMaximumPoolSize

7.mainlock作用

保证外界改变线程池状态时的线程安全.

tryTerminate中确保terminated回调只调用一次

访问workers``HashSet确保线程安全

相关文章

网友评论

      本文标题:ThreadPoolExecutor 源码浅析(二)

      本文链接:https://www.haomeiwen.com/subject/hklifftx.html