做事要有目标,看源码首先要确定需要追踪理解的问题,不然容易陷入逻辑泥潭中,理不清头绪。
因此本分结构分两部分
- 常见问题
- 源码分析,任务如何被执行?
不废话直奔主题(jdk 1.8.0 不同版本还是有不小差异)
(一)常见问题
- 为什么要使用线程池,有啥好处?
- 创建线程池时,构造函数中几个参数的作用?
- Executors中几种线程池的差异?
1. 为什么要使用线程池,有啥好处?
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。可以提供定时、定期、单线程、并发数控制等功能
2. 创建线程池时,构造函数中几个参数的作用?
线程池构造方法,但最终是调用这个方法。
一共七个参数。
public ThreadPoolExecutor(int corePoolSize, //核心线程池大小
int maximumPoolSize, //线程池最大大小
long keepAliveTime, //线程空闲活动时间
TimeUnit unit, //线程空闲活动时间单位
BlockingQueue<Runnable> workQueue, //任务队列
ThreadFactory threadFactory, //创建线程的工厂
RejectedExecutionHandler handler) //拒绝策略
-
corePoolSize(核心线程数量):核心线程会一直存活,即使没有任务需要执行。当线程数少于核心线程数时,有新任务需要执行,即使有空闲线程,线程池也会优先创建新线程而不是使用空闲线程处理。
- workQueue(任务队列):当核心线程数达到最大时,新任务会放在队列中排队等待执行。可以选择以下几种阻塞队列:
ArrayBlockingQueue // 是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue //一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue //一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
PriorityBlockingQueue //一个具有优先级的无限阻塞队列。
-
maximumPoolSize(最大线程数量):线程池允许线程创建的最大线程数。
线程数>=corePoolSize,且任务队列已满时:线程池会创建新线程来处理任务
。线程数=maxPoolSize,且任务队列已满时:线程池会使用拒绝策略处理任务
-
keepAliveTime(线程空闲活动时间)当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize;如果allowCoreThreadTimeout=true,则会直到线程数量=0。
-
threadFactory(创建线程的工厂):默认使用Executors内部类DefaultThreadFactory。(通过实现ThreadFactory接口,给每个创建出来的线程设置有意义的名字,排查线程问题时十分有帮助。阿里巴巴JAVA开发手册中也要求这样做,尽量不要直接使用Executors中的方法创建线程池。)
-
handler(拒绝策略):默认使用AbortPolicy。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。可以选择以下几类拒绝策略:
AbortPolicy:直接抛出异常。默认策略
CallerRunsPolicy:使用调用者所在线程来运行任务
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
DiscardPolicy:不处理,丢弃掉
当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。
3.Executors中几种线程池的差异?
- newFixedThreadPool
/**
* corePoolSize:nThreads
* maximumPoolSize: nThreads
* keepAliveTime:0
* workQueue:LinkedBlockingQueue(容量:Integer.MAX_VALUE)
* threadFactory:默认值
* handler:默认值
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- newSingleThreadExecutor
/**
* corePoolSize:1
* maximumPoolSize: 1
* keepAliveTime:0
* workQueue:LinkedBlockingQueue(容量:Integer.MAX_VALUE)
* threadFactory:默认值
* handler:默认值
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- newCachedThreadPool
/**
* corePoolSize:0
* maximumPoolSize: Integer.MAX_VALUE
* keepAliveTime:60s
* workQueue:SynchronousQueue
* threadFactory:默认值
* handler:默认值
*/
public static ExecutorSersvice newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
(二)源码分析,任务如何被执行?
线程池状态(5个)
- RUNNING(-1):接受新任务,同时执行任务队列中的任务
- SHUTDOWN(0):不接受新任务,但是仍然执行任务队列中的任务
- STOP(1):不接受新任务,不执行任务队列中的任务,并且尝试终止正在执行的任务
- TIDYING(2):所有工作线程已经销毁,任务缓存队列已经清空或执行结束。执行terminated()钩子方法
- TERMINATED(3):terminated()方法执行完成
重要接口
-
ThreadFactory(线程工厂)
线程池通过此接口创建新线程
public interface ThreadFactory {
Thread newThread(Runnable r);
}
默认实现DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
// 使用默认线程工厂的线程池数量
private static final AtomicInteger poolNumber = new AtomicInteger(1);
// 当前线程池中线程所属的线程组
private final ThreadGroup group;
// 当前线程池中累计创建的线程数量
private final AtomicInteger threadNumber = new AtomicInteger(1);
// 当前线程池中线程名称前缀
private final String namePrefix;
DefaultThreadFactory() {
// 声明安全管理器
SecurityManager s = System.getSecurityManager();
// 获取线程组
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
// 线程名前缀,例如 "pool-1-thread-"
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
// 设置线程t为前台线程 0);
if (t.isDaemon())
t.setDaemon(false);
// 设置线程t的优先级为NORM_PRIORITY
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
-
RejectedExecutionHandler(拒绝策略)
指当任务添加到线程池中被拒绝,而采取的处理措施。
可能原因:1.任务队列已满,线程数量达到最大限制;2.线程池关闭(SHUTDOWN);
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
默认实现AbortPolicy
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
// 简单粗暴,直接抛出异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
重要成员变量
//任务缓存队列,用来存放等待执行的任务
private final BlockingQueue<Runnable> workQueue;
//线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
private final ReentrantLock mainLock = new ReentrantLock();
// 线程集合,访问时必须持有 mainLock
private final HashSet<Worker> workers = new HashSet<Worker>();
重要方法
- execute: 新任务是如何被添加的?
- 当线程数 < 核心线程数(corePoolSize): 不管有没有线程空闲,直接创建新线程执行任务,。
- 当线程数 >= 核心线程数,且任务队列未满: 将任务放入任务队列(workQueue)。
- 当线程数 >= 核心线程数,且任务队列已满:若线程数 < 最大线程数(maximumPoolSize),创建新线程执行任务;若线程数 = 最大线程数(maximumPoolSize),使用拒绝策略(RejectedExecutionHandler)处理任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 当前线程数小于核心线程数量时,新建线程
// 并将command作为首个任务开始执行
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果线程池处于RUNNING状态,并且任务添加
// 到队列成功
if (isRunning(c) && workQueue.offer(command)) {
//方法执行过程中,线程池可能发成改变,
// 需要double-check
int recheck = ctl.get();
// 线程池状态改变,非RUNNING,需要回滚
if (! isRunning(recheck) && remove(command))
reject(command);
// 原有线程可能可能已经结束
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果任务无法加入队列中,尝试新建非核心
// 线程执行任务。如果新建失败,则使用拒绝
// 策略处理任务
else if (!addWorker(command, false))
reject(command);
}
- runWorker: 新任务是如何被线程执行的?
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 创建线程时放入的首个任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 循环, 从队列中取出等待执行的任务
// 如果任务队列为空,超过keepAliveTime时长,
// getTask()返回为空,此时线程就会销毁
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任务执行开始前调用,子类可重写此方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 任务执行结束后调用,子类可重写此方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 线程销毁
processWorkerExit(w, completedAbruptly);
}
}
- getTask: 任务如何从队列中取出?
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 允许核心线程销毁或者线程数量大于核心线程数量
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 获取任务超时,线程数量减1,返回空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 从阻塞队列中获取任务设定超时时间
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
网友评论