写在前面
咱们程序员都知道,多线程是个好东西,可以异步或者并行执行任务,提高程序性能,然而多线程又好比一把双刃剑,用的好可以大幅提高程序性能,用的不好,就有可能导致程序异常,甚至崩溃。前段时间,就不慎踩坑,记录分享一下,大家一起来避坑。由于业务代码复杂,我这里按照当时业务场景的模式,写了一个可执行的demo。
出了啥问题?
程序上线后不久,便出现假死,无法正常工作。重启后涛声依旧。
问题排查与分析
上服务器首先top
、df
、free
三连击,结果CPU正常,磁盘正常,内存正常。并没有发现什么异常,然后通过阿里神器arthas
进行分析。
arthas 分析
Arthas 用户文档 https://alibaba.github.io/arthas/
- 进入arthas
java -jar arthas-boot.jar
- 选择java pid
- 打开dashboard
- 检查线程 thread
打开arthas
image.png进入arthas,选择java进程
dashboard
image.png打开dashboard看看
看看线程
thread -b 看看有无死锁,并没发现死锁
[arthas@68541]$ thread -b
No most blocking thread found!
Affect(row-cnt:0) cost in 21 ms.
image.pngthread 看看线程情况,结果发现大量WAITING线程。
jstack 看看线程出现什么问题了
image.pngjstack 68541,找到了一些症状,定位到了WAITING的代码所在位置。
分析代码
找到代码位置,下面我贴出代码。
入口
package threadDeadLock;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* @author William Zhang
* @date 2020/7/3 3:19 下午
* @description
*/
public class ThreadDeadLockDemo {
/**
* 初始化线程池
*/
public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(
50, 100, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10000), r -> new Thread(r, "zw-test-thread-pool"));
/**
* 程序入口
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
/**mock一定数量的任务**/
List<Integer> skuIds = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
/**开始任务**/
TaskAService taskAService = new TaskAService();
taskAService.taskA(skuIds);
/**完成后停止线程组**/
executor.shutdown();
}
}
ServiceA
package threadDeadLock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
/**
* @author William Zhang
* @date 2020/7/3 3:49 下午
* @description
*/
public class TaskAService {
private TaskBService taskBService = new TaskBService();
/**
* taskA,对传入的用户id集合,进行并行操作。
* @param userIds
* @throws Exception
*/
public void taskA(List<Integer> userIds) throws Exception{
System.out.println("taskA begin ...");
List<Future> futures = new ArrayList<>();
for (Integer userId : userIds) {
// 并行执行任务
Future<?> future = ThreadDeadLockDemo.executor.submit(() -> {
try {
taskBService.taskB(userId);
} catch (Exception e) {
e.printStackTrace();
}
});
futures.add(future);
}
// 等所有任务完成后返回
for (Future future : futures) {
future.get();
}
System.out.println("taskA end ...");
}
}
ServiceB
package threadDeadLock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
/**
* @author William Zhang
* @date 2020/7/3 3:49 下午
* @description
*/
public class TaskBService {
/**
* 查询商品
*/
public Integer taskB(Integer userId) throws Exception{
System.out.println("taskB 开始...");
// 这里也是模拟并行查询用户明细
List<Future> futures = new ArrayList<>();
futures.add(queryUserInfoA(userId));
futures.add(queryUserInfoB(userId));
futures.add(queryUserInfoC(userId));
futures.add(queryUserInfoD(userId));
futures.add(queryUserInfoE(userId));
futures.add(queryUserInfoF(userId));
for (Future future : futures) {
future.get();
}
System.out.println("taskB 结束...");
return userId;
}
/*********** 下面都是模拟查询用户xxx信息的接口***********/
private Future queryUserInfoA(Integer userId){
Future<?> future = ThreadDeadLockDemo.executor.submit(() -> {
try {
// 模拟查询过程
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
return future;
}
private Future queryUserInfoB(Integer userId){
return queryUserInfoA(userId);
}
private Future queryUserInfoC(Integer userId){
return queryUserInfoA(userId);
}
private Future queryUserInfoD(Integer userId){
return queryUserInfoA(userId);
}
private Future queryUserInfoE(Integer userId){
return queryUserInfoA(userId);
}
private Future queryUserInfoF(Integer userId){
return queryUserInfoA(userId);
}
}
解密
问题非常隐晦,定位到的代码,看起来一切正常,仔细分析后发现,A方法里面有future get,然后taskA里面调用的taskB方法,也是future get,由于get会阻塞线程,那会不会是这里出现了问题呢?
是的,线程数池的资源是宝贵的,这里形成了类似“死锁”的情景。执行taskA,它要等待taskB全部执行完成,如果此时线程池全部被taskA任务占用,taskB就进入阻塞队列,等待执行。而且阻塞队列设置的过长,无法填满,从而无法激活corePoolSize以外的线程,此时corePoolSize里面的线程没有执行完成,taskB在阻塞队列中等待,无法执行,那么也就导致taskA一直等待,这样就形成了“死锁”。
总结
在开发时,我们要注意以下几点:
- 设置合理的
corePoolSize
,maximumPoolSize
,queueSize
。corePoolSize
太小无法利用CPU资源,太大增加线程上下文切换反而耗费资源,一般设置CPU核心数2倍。 -
queueSize
同样非常重要,太大的话,其实是无法激活corePoolSize
以外的线程来进行工作,具体数值按照业务来估算。 - 线程池隔离,对于特定业务,使用专用的线程池,隔离线程之间的干扰。
- future.get(),要设置超时时间,避免堵死整个线程。
更新后的代码
/**
* 初始化线程池
*/
public static final ThreadPoolExecutor taskAExecutor = new ThreadPoolExecutor(
5, 100, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10000), r -> new Thread(r, "zw-test-taskA-thread-pool"));
public static final ThreadPoolExecutor taskBExecutor = new ThreadPoolExecutor(
5, 100, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), r -> new Thread(r, "zw-test-taskB-thread-pool"));
/*********** 下面都是模拟查询用户xxx信息的接口***********/
private Future queryUserInfoA(Integer userId){
Future<?> future = ThreadDeadLockDemo.taskBExecutor.submit(() -> {
try {
// 模拟查询过程
Thread.sleep(500L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
return future;
}
网友评论