最近我们组杨青同学遇到一个使用线程池不当的问题:异步处理的线程池线程将主线程hang住了,分析代码发现是线程池的拒绝策略设置得不合理,设置为CallerRunsPolicy。当异步线程的执行效率降低时,阻塞队列满了,触发了拒绝策略,进而导致主线程hang死。
从这个问题中,我们学到了两点:
- 线程池的使用,需要充分分析业务场景后作出选择,必要的情况下需要自定义线程池;
- 线程池的运行状况,也需要监控
关于线程池的监控,我参考了《Java编程的艺术》中提供的思路实现的,分享下我的代码片段,如下:
public class AsyncThreadExecutor implements AutoCloseable {
private static final int DEFAULT_QUEUE_SIZE = 1000;
private static final int DEFAULT_POOL_SIZE = 10;
@Setter
private int queueSize = DEFAULT_QUEUE_SIZE;
@Setter
private int poolSize = DEFAULT_POOL_SIZE;
/**
* 用于周期性监控线程池的运行状态
*/
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder().namingPattern("async thread executor monitor").build());
/**
* 自定义异步线程池
* (1)任务队列使用有界队列
* (2)自定义拒绝策略
*/
private final ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(queueSize),
new BasicThreadFactory.Builder().namingPattern("async-thread-%d").build(),
(r, executor) -> log.error("the async executor pool is full!!"));
private final ExecutorService executorService = threadPoolExecutor;
@PostConstruct
public void init() {
scheduledExecutorService.scheduleAtFixedRate(() -> {
/**
* 线程池需要执行的任务数
*/
long taskCount = threadPoolExecutor.getTaskCount();
/**
* 线程池在运行过程中已完成的任务数
*/
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
/**
* 曾经创建过的最大线程数
*/
long largestPoolSize = threadPoolExecutor.getLargestPoolSize();
/**
* 线程池里的线程数量
*/
long poolSize = threadPoolExecutor.getPoolSize();
/**
* 线程池里活跃的线程数量
*/
long activeCount = threadPoolExecutor.getActiveCount();
log.info("async-executor monitor. taskCount:{}, completedTaskCount:{}, largestPoolSize:{}, poolSize:{}, activeCount:{}",
taskCount, completedTaskCount, largestPoolSize, poolSize, activeCount);
}, 0, 10, TimeUnit.MINUTES);
}
public void execute(Runnable task) {
executorService.execute(task);
}
@Override
public void close() throws Exception {
executorService.shutdown();
}
}
这里的主要思路是:(1)使用有界队列的固定数量线程池;(2)拒绝策略是将任务丢弃,但是需要记录错误日志;(3)使用一个调度线程池对业务线程池进行监控。
在查看监控日志的时候,看到下图所示的监控日志:
屏幕快照 2018-03-28 21.55.19.png
这里我对largestPooSize的含义比较困惑,按字面理解是“最大的线程池数量”,但是按照线程池的定义,maximumPoolSize和coreSize相同的时候(在这里,都是10),一个线程池里的最大线程数是10,那么为什么largestPooSize可以是39呢?我去翻这块的源码:
/**
* Returns the largest number of threads that have ever
* simultaneously been in the pool.
*
* @return the number of threads
*/
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
注释的翻译是:返回在这个线程池里曾经同时存在过的线程数。再看这个变量largestPoolSize在ThreadExecutor中的赋值的地方,代码如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;//这里这里!
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
发现两点:
- largestPoolSize是worker集合的历史最大值,只增不减。largestPoolSize的大小是线程池曾创建的线程个数,跟线程池的容量无关;
- largestPoolSize<=maximumPoolSize。
PS:杨青同学是这篇文章的灵感来源,他做了很多压测。给了我很多思路,并跟我一起分析了一些代码。
网友评论