线程池可以降低系统线程的重复创建销毁频率,给用户提供了很方便的多任务执行方式。
ThreadPoolExecutor
java中线程池的核心实现是ThreadPoolExecutor。
ThreadPoolExecutor的继承关系如下
ThreadPoolExecutor的构造函数有很多,但是最后都是调用到了这个。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数的含义依次是:
- corePoolSize ,核心线程的个数
- maximumPoolSize,线程池能开启的线程最大值
- keepAliveTime,unit 额外线程超时多久不工作,将销毁,如果调用了对象的allowCoreThreadTimeOut方法,将允许所有线程超时退出。
- workQueue 任务队列,用来缓存消息的
- threadFactory 线程创建工厂,可以用来设置线程的一些属性,比如线程组,名字,优先级,是否守护等。
-
handler 处理不了的消息将会调用这个对象的rejectedExecution方法来执行拒绝策略,可以看到ThreadPoolExecutor提供了几种默认实现。
1.AbortPolicy 默认的处理方式,抛出异常。
2.DiscardPolicy 啥也不干
3.rejectedExecution 将缓存队列的头部(poll方法)取出,重新尝试一次
4.CallerRunsPolicy 直接使用调用方的线程执行任务的run操作。
我们主要关注这几个方法:execute/submit,shutdown/shutdownNow。首先是 execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以看到初步的逻辑,如果当前的线程数量没达到核心线程数, 则直接创建线程执行任务,否则尝试调用队列的offer方法,将任务塞入队列,如果塞入失败,则执行拒绝策略。
进一步看看addWorker方法。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
此函数做的事情有:1.判断线程是否超限,是则退出,否则用cas的方式给工作线程数量自增。2.创建Worker对象,绑定一个新建线程和首个任务并start该线程。Worker是ThreadPoolExecutor实现了Runnable方法的内部类(可以直接保存外部类对象,所以可以直接访问外部类的方法)。
线程启动以后执行如下方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- 1.代码比较清晰,获取首个任务/ThreadPoolExecutor对象的缓存队列里拿任务,并执行。看一下getTask方法。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
判断线程池是否被shutdown,随时准备退出。基于核心线程数量和用户是否允许核心线程退出,判断当前线程是否需超时退出。分别执行poll或take方法。这两个方法是阻塞的,等到任务或者超时了才返回。如果超时了还没等到任务,这个线程将退出。
Executor中默认提供的线程池实现
Executor中提供了几个默认的线程池实现,实际都是调用了ThreadPoolExecutor来创建的,我们先看看会使用到的任务队列。
-
1.LinkedBlockingQueue 如其名,内部实现是一个FIFO的单向链表,比较简单,略过。
-
2.SynchronousQueue 同步队列,生产者和消费者需要同时触发生产和消费的动作时,才能成功交换数据。SynchronousQueue 的poll,take,offer,put等方法操作实际是由内部的Transferer接口的transfer来完成的,这个接口有两种实现,TransferStack(FILO)和TransferQueue(FIFO)。下面以TransferStack为例作分析。
E transfer(E e, boolean timed, long nanos) {
/*
* Basic algorithm is to loop trying one of three actions:
*
* 1. If apparently empty or already containing nodes of same
* mode, try to push node on stack and wait for a match,
* returning it, or null if cancelled.
*
* 2. If apparently containing node of complementary mode,
* try to push a fulfilling node on to stack, match
* with corresponding waiting node, pop both from
* stack, and return matched item. The matching or
* unlinking might not actually be necessary because of
* other threads performing action 3:
*
* 3. If top of stack already holds another fulfilling node,
* help it out by doing its match and/or pop
* operations, and then continue. The code for helping
* is essentially the same as for fulfilling, except
* that it doesn't return the item.
*/
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
-
第一个参数e为空的话,则是获取。以take为例,如果没有生产者,即当前队列为空,新建一个节点放入栈顶,同时调用awaitFulfill(内部会将线程信息放入节点,调用UnSafe类的park方法将当前线程挂起),后续还有消费者进来的话也是阻塞等待唤醒,直到有个生产者进来查询栈顶的waiter信息并唤醒它。
-
第一个参数有值的话, 则是插入,过程和上面类似,有put的poll方法。
- newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程数为0,最大线程数未限制,用的是同步队列,所以特点是:任务消耗完一分钟之后,线程就全部销毁;如果在线程就绪的时候,塞入任务,可以马上被就绪线程消耗掉,如果没有空闲的线程,则开启新的线程来执行,所以会有线程开辟过多的风险。
- newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
固定核心线程数量,但是队列是无限大的。任务过多且来不及处理可能会造成oom。
- newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
newFixedThreadPool 的特殊情况,只有一个线程。
- newScheduledThreadPool
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
固定核心线程,最大线程数量不做限制,不同的是,这个方法返回的是个ScheduledExecutorService,ScheduledExecutorService继承于ExecutorService,所以有了更多的功能,可以调用scheduleAtFixedRate来完成延时任务的执行,这里有疑问,DelayedWorkQueue队列大小是没有做限制的,创建线程池时用的最大线程数量Integer.MAX_VALUE好像无实际作用。
网友评论