与线程池相关的核心类为ThreadPoolExecutor,但是在实际应用中,我们并不会调用这个类来使用线程池,而是使用Executors,Executors类封装了生成几种不同线程池的方法。
线程池的用法
ExecutorService es = Executors.newCachedThreadPool();
es.execute(Runnable run);
所以从execute(Runnable command)方法讲起,这个方法会将任务提交给一个线程进行执行
execute(Runnable command)方法源码:
//调用execute方法会将线程提交到线程池中
public void execute(Runnable command) {
//如果任务为null,则抛出异常
if (command == null)
throw new NullPointerException();
//1.如果当前线程数量小于corePoolSize的大小
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//addWorker方法会进一步进行检查,因为有可能其他线程对线程池的状态做了改变
//addWorker方法中进行两个检查1)线程池的状态 2)当前线程数量是否超过corePoolSize的大小
//检查完成以后如果都否合要求,创建一个Worker,new Worker(Runnable r),如果条件满足,会将这个worker加入到HashSet<Worker>中去,并且返回true
if (addWorker(command, true))
return;
c = ctl.get();
}
//2.如果corePoolSize已经满了,则需要加入到阻塞队列
//会进行一个判断,线程池的状态以及是否可以往阻塞队列中继续添加runnable
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//在进行一次判断,这个判断主要是为了有其他线程调用了shutDown或者shutDownNow方法,这时候如果再有任务就会拒绝执行
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果此时队列已满,则会采取相应的拒绝策略
//addWorker中第二参数boolean core,如果false,则边界为maxmunSize,如果为true,则边界为corePoolSize
else if (!addWorker(command, false))
reject(command);
}
execute(Runnable command)方法战略三步走
1.往corePoolSize中加入任务进行执行
2.当corePoolSize满时往阻塞队列中加入任务
3.阻塞队列满时并且maximumPoolSize已满,则采取相应的拒绝策略
因为execute(Runnable command)方法没有加锁,所以做了很多相同的判断,因为很有可能这个线程在执行execute方法时有其他线程已经完成了execute方法并且改变了线程池的状态(比如可能因为其他线程的执行导致corePoolSize已满,或者其他线程调用了shutDown()或者shutDownNow()方法拒绝在接受新任务)
可以看出execute(Runnable command)方法中最核心的方法是addWorker(Runnable firstTask, boolean core)
先看addWorker(Runnable firstTask, boolean core)方法的源码:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//返回线程池的状态
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//打破死循环的关键
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//为什么叫addWorker方法,从这里就可以看出,创建了一个Worker对象,并且Runnable是其里面的一个字段
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
//正真执行到往corePoolSize添加任务时会进行一个加锁操作
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//在加锁之后还会进行一个判断
//判断1.线程池是否在running 2.任务是否被移除
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//往corePoolSize中加入任务
workers.add(w);
int s = workers.size();
//调整曾经线程池拥有最大线程数的大小
if (s > largestPoolSize)
largestPoolSize = s;
//改变状态
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//运行该任务,t.start方法最终会调用native void start0()方法
t.start();
//改变状态
workerStarted = true;
}
}
} finally {
//如果加入失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
从代码中看出其实corePoolSize中就是维护了一个HashSet<Worker>
并且在真正往corePoolSize加入任务时会进行一个加锁,加完锁还会做一个验证
很多细节还待以后继续补充,现在学习源码的功力还不够
网友评论