知识整理,是对知识点的梳理,以前没什么感觉,但多写了几篇博客之后发现,我对某个知识点不是完全懂了,我的思路不清晰,在大脑中没有完整的知识体系,还有也发现自己的文本表达能力较弱,对某个知识的理解混乱没调理。所以开始加大写博客的力度,锻炼自己的能力以及分享,现在就是,我有个观点就是你把代码写出来不代表你就会了,你必须能表达出来,因为你会了是别人对你的认同,而不是自己对自己的认同。
我对网络请求进行封装以及研究AsyncTask 时,仔细研究过线程池ThreadPoolExecutor的源码,所以今天把ThreadPoolExecutor讲清。ThreadPoolExecutor就是起到管理线程的作用,就是保证线程的重用,防止一直重复创建线程,消耗大量资源,因为线程的创建时很消耗资源的。
1、ThreadPoolExecutor初始化
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//核心线程数
this.corePoolSize = corePoolSize;
//最大线程数
this.maximumPoolSize = maximumPoolSize;
//请求等待队列
this.workQueue = workQueue;
//线程存活时间
this.keepAliveTime = unit.toNanos(keepAliveTime);
//线程创建工厂类
this.threadFactory = threadFactory;
this.handler = handler;
}
可以看出ThreadPoolExecutor主要包括corePoolSize 、maximumPoolSize 、workQueue 、keepAliveTime 、threadFactory 属性。以及一个很重要的属性ctl,AtomicInteger类型,原子数,保证原子操作,下面是对该属性注释的部分解释。
/**
* 整个线程池的控制状态,包含了两个属性:有效线程的数量、线程池的状态(runState)。
* workerCount,有效线程的数量
* runState, 线程池的状态
*
* ctl 包含32位数据,低29位存线程数,高3位存runState,这样runState有5个值:
*
* RUNNING: 接受新任务,处理任务队列中的任务
* SHUTDOWN: 不接受新任务,处理任务队列中的任务
* STOP: 不接受新任务,不处理任务队列中的任务
* TIDYING: 所有任务完成,线程数为0,然后执行terminated()
* TERMINATED: terminated() 已经完成
* 具体值:
* RUNNING:-536870912
* SHUTDOWN:0
* STOP:536870912
* TIDYING:1073741824
* TERMINATED:1610612736
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Packing and unpacking ctl
//获取runState值,线程池的运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取workerCount值,有效线程的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
//将运行状态和线程池数组合成新的ctl值。
private static int ctlOf(int rs, int wc) { return rs | wc; }
//是否运行中
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
corePoolSize :
这些线程一直存活,就是只要当前线程数小于corePoolSize ,那么就会添加,而且就算当前没有任务,只要线程数不大于当前corePoolSize ,那么这些线程就会一直存活,当然如果调用allowCoreThreadTimeOut(true)方法,那么这些线程在没有任务的时候也会释放掉。
maximumPoolSize :
最大线程数,顾名思义就算当前线程池所持有的最多线程,如果超出这个数就会报异常。
workQueue :
请求等待队列,当当前线程数不小于corePoolSize 时,而workQueue 队列没有满,那么这时就会把请求放到workQueue 队列中,等待执行。
keepAliveTime :
线程等待存活时间,也就是当线程闲置下来时等待下次任务最长时间,默认情况下,这时对核心线程之外的线程的处理,也就是大于corePoolSize 的线程等待时间,当调用allowCoreThreadTimeOut(true)方法,如果当前没有任务,核心线程也会有存活时间。
threadFactory :
线程创建工厂类,创建线程,一般就是调用new Thread()创造线程,当然也可以自己继承Thread,添加自己需要的属性以及操作。
2、添加任务以及执行
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1.如果线程数小于corePoolSize ,创建一个新的线程执行任务
*
* 2. 如果当前线程数不小于corePoolSize ,尝试把任务添加到等待队列中。
*
* 3.如果不能把任务添加到请求队列中,则尝试创建一个新的线程,也就是线程数大于corePoolSize 小于maximumPoolSize 的情况。
*/
int c = ctl.get();
/**
* 当前有效线程数小于核心线程数,尝试添加任务
**/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/**
* 如果线程池处于RUNNING状态则尝试把请求加入等待队列
**/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* 如果请求加入等待队列失败,则尝试创建新线程
**/
else if (!addWorker(command, false))
reject(command);
}
在这里一直调用了addWorker方法,可见该方法的重要性。
/*
/**
* @param firstTask 等待执行的任务
*
* @param core 是否把任务添加到核心线程中执行 ,true用 核心线程数判断执行,false 用最大线程数判断执行
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);
// 不添加firstTask条件
1、线程池状态不能为RUNNING
2、当线程池状态为SHUTDOWN 时,firstTask为null,并且workQueue不为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取当前线程数
int wc = workerCountOf(c);
//第一个添加不可能,看第二个,当core为true时,添加的是核心线程数中,,当core为false时,添加的是最大线程数中。如果当前线程数大于等于要求线程数(core为true,corePoolSize :core为false,maximumPoolSize)则不添加任务,
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//最新的线程数和addWorker一开始获取的值进行比较,看有没有被修改,如果修改过了则重新进行判断,这里用到了goto语句。就是跳到上面retry的地方,重新尝试的意思。
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//和上面方法一样,只是这里判断线程池运行状态有没有改变。
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//把任务封装在Worker中,最后都会在Worker中执行,并创建一个新线程。
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//锁,保证线程同步
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池状态
int rs = runStateOf(ctl.get());
// 如果线程池处于RUNNING状态执行添加任务操作,或线程池处于SHUTDOWN 状态,firstTask 为空。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//线程执行
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
可以看出addWorker方法主要是生成新的线程,而线程的重用则在Worker类中实现。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** worker持有的线程 */
final Thread thread;
/** worker正在执行的任务 ,可能为null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* 创建Worker时会同时创建一个新线程.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//把Worker传递给新建的线程,当线程执行是会调用Worker的run方法。
this.thread = getThreadFactory().newThread(this);
}
/** 线程执行时会调用该方法 */
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
最后执行runWorker方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//获取要执行的任务,getTask()是线程阻塞的,当然有判断条件,下面再说。也就是说只有只有getTask()获取任务为null时,线程就会结束。
while (task != null || (task = getTask()) != null) {
w.lock();
//逻辑复杂,就是在在STOP以及大于STOP状态时尝试中断线程。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//任务真正执行前操作
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
线程获取待执行任务方法,从上面的分析可以看出,这里才是线程重用的关键,所以下面分析getTask方法。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 当线程池处于STOP及以上状态时,线程数减一,该线程不使用。
//当线程处于SHUTDOWN 状态时,并且workQueue请求队列为空,释放该线程。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取当前线程数
int wc = workerCountOf(c);
// 如果调用allowCoreThreadTimeOut方法设置为true,则所有线程都有超时时间。
//如果当前线程数大于核心线程数则该线程有超时时间。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//poll方法为线程阻塞方法,keepAliveTime为阻塞最长时间,若超时则返回null,
//take()方法没有超时时间,会一直获取。也就是说在这里不断获取任务,
//也就是如果线程池处于RUNNING、SHUTDOWN状态时,只要等待队列不为空,那么线程就会一直执行。这也就是线程重用的原理。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3、结束线程池
ThreadPoolExecutor有两个结束的方法shutdown、shutdownNow。shutdown是把线程池状态转为SHUTDOWN,这时等待队列中的任务可以继续执行;
shutdownNow方法是把线程池状态转为SHUTDOWN,这时等待队列中的任务不可以继续执行,只能执行已经执行的任务;
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//把线程池状态改为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断所有空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//把线程池状态改为STOP
advanceRunState(STOP);
// 中断所有空闲线程
interruptWorkers();
// 返回队列中还没有被执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
advanceRunState 改变线程池的状态
//把线程池状态改为目标状态targetState
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
interruptIdleWorkers中断线程
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//线程没有被中断并且Worker 正在获取任务中,就是空闲中。线程中断
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
getTask方法中可以看出如果线程池处于STOP已经以上状态时不会继续获取任务,而是尝试中断线程,这也就是shutdown、shutdownNow的区别。我查找资料发现这些内容:
1、ReentrantLock.lockInterruptibly允许在等待时由其它线程调用等待线程的Thread.interrupt方法来中断等待线程的等待而直接返回,这时不用获取锁,而会抛出一个InterruptedException。
然后我们进入Executors的生成方法,发现使用的是LinkedBlockingQueue类,而LinkedBlockingQueue的take()方法如下
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//调用了lockInterruptibly方法
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
所以当线程在获取任务阻塞时,如果该线程被调用了interupt方法,则该线程释放,所以说释放空闲线程。
网友评论