这篇首先增加了几个遗漏的方法
之后整理了阅读源码中遇到的几个问题.
- shutdown方法
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); //更新ctl的状态
interruptIdleWorkers(); //中断空闲进程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
- awaitTermination
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
// termination是mainlock上的一个条件.
/// 整个方法只是等待在该条件上之后返回
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
- setCorePoolSize
//该方法可以在运行过程中动态调整corePoolSize
//改变后如果线程数大于core则尝试中断空闲进程
//如果coresize比原来大则尝试增加coreWorker的数目
//如果增量大于queue中的任务数量的话则只保证队列中的任务都有Worker来运行.
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
问题
- 该类是怎样保证线程安全的
- tryTerminate方法的作用
- 在什么情况下会添加线程,任务是怎样从工作队列中取出的
- 工作线程如果被中断或者出现异常是怎样处理的.
- 超时时间是怎样体现的.超时之后工作线程是怎样处理的.
- iterruptIdleThread方法的作用
- mainlock作用
1.该类是怎样保证线程安全的
从暴露的api来看(public方法),
-
get,set方法:
-
volatile变量的getset方法只是简单返回了变量本身,而且这些变量只是设定的数值,并不真正涉及线程池的状态.所以是线程安全的.
-
状态统计方法
getTaskCount()
,可以看到这个类使用mainlock,对每个Worker进行状态统计.因为workers这个HashSet不是线程安全的所以使用了mainlock保证这个集合在迭代过程中不会改变.
-
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
-
除去get,set方法之外,会在被多个线程同时调用的有这几个方法
-
execute()
, 这个方法主要调用了addWorker()
方法, (对线程池运行状态的访问是CAS操作)
addWorker()
创建线程添加到workers (HashSet)使用mainlock保证了对workers状态 更改是线程安全的.
-
-
remove(Runnable r)
-> 委托给BlockingQueue.remove
-
shutdown()
,shutdownNow()
,awaitTermination()
-> 使用mainlock
2. tryTerminate()方法的作用
代码中调用该方法的方法有
addWorkerFailed
processWorkerExit
-
shutdown
,shutdownNow
这个方法只有在状态为( SHUTDOWN, STOP, TIDYING) 时才会生效.
在没有工作线程运行时会保证terminated只被调用一次.
否则会中断没有停止的线程.
shutdown
,shutdownNow
调用前会中断所有空闲线程,调用该方法的目的是调用terminated方法
而processWorkerExit
和addWorkerFailed
方法则是为了线程退出或者创建线程后把这种线程中断掉.
如果不中断掉的话,shutdown
则无法退出.
3.在什么情况下会添加线程,任务是怎样从工作队列中取出的
- getTask() 从工作队列中取出
- 添加线程
-
processWorkerExit
线程退出后如果线程数少于core,会调用addWorker
-
execute
新任务提交时, 少于 core 创建线程,工作队列满 - 动态调整
core
时,如果core变大,调用addWorker
-
4.工作线程如果被中断或者出现异常是怎样处理的.
runWorker
的逻辑类似于下面的代码
final Runnable task2 = () ->{
if(Thread.currentThread().isInterrupted()){
System.out.println("iterrupted");
return;
}
System.out.println(2);
};
Runnable task = () ->{
Exception exp = null;
try {
Thread.currentThread().interrupt();
System.out.println(1);
task2.run();
}catch (Exception e) {
exp = e; throw e;
}finally {
if(exp!=null){
exp.printStackTrace();
}
}
};
Thread t = new Thread(task);
t.start();
task2相当于传入线程池的Runnable任务.
task相当于runWorker
方法.
逻辑为如果线程池处于即将停止状态Worker中的线程就会被中断.
如果task2(传入的Runnable任务)中没有处理中断的逻辑的话,该线程还是不会退出.
出现异常的话会被task2.run()
外部的catch块接受到异常,之后异常会传递到afterExecute
回调中
而且出现异常的话该线程会被processWorkerExit
处理并从workers中移除.
5.超时时间是怎样体现的.超时之后工作线程是怎样处理的.
超时体现在从工作线程中取得任务getTask
方法,如果从workQueue
中限时取得任务超时,则返回null
这样runWorker
方法中的while方法会退出,之后执行processWorkerExit
逻辑,移除工作线程
6.iterruptIdleThread方法的作用
这个方法会中断所有线程(参数为false时),用于在shutdown
中减少空闲线程数目.
在动态调整线程池时也会调用该方法allowCoreThreadTimeOut
,setMaximumPoolSize
7.mainlock作用
保证外界改变线程池状态时的线程安全.
tryTerminate
中确保terminated
回调只调用一次
访问workers``HashSet
确保线程安全
网友评论