日常使用线程池基本都是使用 Executors 提供给我们设计好的各色线程池对象了,我们点进去看看:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
可以看到3个都是使用了 ThreadPoolExecutor 这个类,1个用的是 ScheduledThreadPoolExecutor 这个类,他们的区别是各自的参数不同,那么我们要研究线程池的实现就得看取去看 ThreadPoolExecutor 这个类了
ThreadPoolExecutor、ScheduledThreadPoolExecutor 这2个类都是实现了 ExecutorService 这个线程池的接口,而 ExecutorService 又继承了 Executor 这个更深层次的接口,看下 UML 类图:
1. ThreadPoolExecutor 构造方法
ThreadPoolExecutor 这个类的构造方法上篇文章有说过,可以自己设置参数从而实现自己的线程调度器,这里我再放一下
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
没什么好说的,大家可以重点关注下 Executors 各个工厂方法中传入的参数有何不同,如何实现自己的意图的
2. ThreadPoolExecutor.execute 方法
ThreadPoolExecutor.execute() 是线程池的核心,也是研究线程池原理的不二入口
public void execute(Runnable task) {
//取出当前线程池活跃的线程数。
//ctl是一个原子类型的对象(final AtomicInteger ctl),用来保存当前线程池的线程数以及线程池的状态。
int c = ctl.get();
//如果当前的活跃线程数小于核心线程数,即使现在有空闲线程,也创建一个新的线程,去执行这个任务
if (workerCountOf(c) < corePoolSize) {
//创建一个新的线程,去执行这个任务。
if (addWorker(task, true))
return;
//如果执行到这一句说明任务没有分配成功。
//所以获得当前线程池状态值,为后面的检查做准备。
c = ctl.get();
}
//如果大于核心线程数,检查一下线程池是否还处于运行状态,并尝试把任务放入到blockingQueue任务队列中。
if (isRunning(c) && workQueue.offer(task)) {
//这里再次检查一下线程池的状态
int recheck = ctl.get();
if (! isRunning(recheck) && remove(task))
//如果线程池不处于运行状态的话,就把我们刚才添加进任务队列中的任务移出,并拒绝这个任务。
reject(task);
//检查如果当前线程池中的线程数,如果为0了,就为线程池创建新线程(因为有可能之前存活的线程在上一次检查过后死亡了)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//执行到这一句,说明队列满了。这时,如果当前线程池中的线程数还没有超过最大线程数,就创建一个新的线程去执行这个任务,如果失败就拒绝这个任务。
else if (!addWorker(task, false))
reject(task);
}
我在代码里加上了注解,这样看着舒服,看到没 ThreadPoolExecutor 是如何添加任务的,启动新线程的
代码中的亮点我觉得就是 if 判断了,我们写 if 时至少是我自己都是往上罗各种状态,没有其他的扩展,但是大家看到没,官方代码里面运用了 &&、|| 的特性,&& 2个都是 true 才是 true ,前面的要是 false 了,后面的就不用运行了,比如 if (isRunning(c) && workQueue.offer(task)) ,先判断状态,不是的话直接走下面,是的话走后面的条件,源码这里直接就进行了添加队列的操作,操作成功进入里面,不成功还是走外面,是不是很巧妙,这样节省了很多代码哎,其实熟悉下的话也不会觉得逻辑都多复杂
我们来说说期中的执行逻辑:
- 线程数小于核心线程数 - 启动新线程执行任务,启动成功退出
- 线程池不是运行状态 - 则尝试新线程执行任务,还是不行的话走异常策略
- 线程处于是运行状态 - 尝试添加任务到阻塞队列
线程池的策略是线程数 = 核心线程数了,优先把任务添加到阻塞队列里, 如果队列满了或是添加失败才尝试启动新线程执行任务,因为什么设计,是因为手机现在都是多核的,可以同时运行复数的线程,但是线程数要是超过 cpu 核心数的话,就会造成线程竞争 cpu 时间,来回切换线程会造成性能上的巨大损失,对于非 IO 型的高计算密度任务来说这是得不偿失的,还不如大家排队等着快呢
执行流程线程池是如何实现循环的
答案就在于线程池自己线程类型了 Worker 了,Worker 是线程池堆线程的封装,但是 Worker 并不是继承 Thread 的,而是实现了 Runnable 接口,其成员变量有个 thread 对象
private final class Worker implements Runnable{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}
大家想啊 Worker 既然是实现的 Runnable 接口,那么名堂自然就在 run 方法里了
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
答案在 while (task != null || (task = getTask()) != null) 这个循环里,这里又是非常巧妙的运用了 || 的特性,前面的是 true 后面的条件就不知行了,前面不是 true 才执行后面的条件,firstTask 不是 null 直接进入里面执行,firstTask 执行完了会不停的 getTask 获取任务,这里就循环的跑起来了
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断线程是不是核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 这里通过 time 判断是不是核心线程
// 非核心线程就 poll 取任务,指定时间拿不到任务非核心线程就关闭了
// 核心线程就 take 获取任务,没有任务就阻塞在这里了
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
线程池的运行基本上就这么多了,了解了线程池如何添加任务,worker 任何循环跑来起获取任务就 ok 了
网友评论