背景介绍
在项目中,为了提高系统性能使用了RxJava实现异步方案,其中异步线程池是自建的。但是当QPS稍微增大之后却发现系统假死、无响应和返回,调用方出现大量超时现象。但是通过监控发现,系统线程数正常,内存使用也没有问题,也没有死锁。
在当时没有定位到问题,通过增加一台机器解决了问题。后面也出现同样问题,定位发现是线程池的问题,因为时间紧迫没有深究直接使用了RxJava的Schedulers.io()线程替代自带线程池。最近通过重新学习,发现了问题的原因。
场景模拟
自建线程池
public class ThreadSchedulerUtils {
private final static ThreadFactory ioFactory = new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build();
private final static ExecutorService ioTreadPool = new CatExecutorServiceTraceWrapper(new ThreadPoolExecutor(2, 4, 100L, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<Runnable>(4), ioFactory, new ThreadPoolExecutor.AbortPolicy()));
public static Scheduler getIO() {
return Schedulers.from(ioTreadPool);
}
}
模拟调用代码
@Test
public void test12() {
Long aLong = Flowable.fromIterable(IntStream.range(1, 15).mapToObj(i -> i).collect(Collectors.toList())).parallel(8).runOn(ThreadSchedulerUtils.getIO()).map(i -> {
System.out.println("当前值:" + i + ",线程名称" + Thread.currentThread().getName());
Thread.sleep(1000L);
return i;
}).sequential().count().blockingGet();
}
运行结果
当前值:1,线程名称io-pool-0
当前值:2,线程名称io-pool-1
当前值:7,线程名称io-pool-2
当前值:8,线程名称io-pool-3
当前值:9,线程名称io-pool-0
当前值:10,线程名称io-pool-1
当前值:3,线程名称io-pool-2
当前值:4,线程名称io-pool-3
当前值:11,线程名称io-pool-2
当前值:6,线程名称io-pool-0
当前值:5,线程名称io-pool-1
当前值:12,线程名称io-pool-3
当前值:14,线程名称io-pool-0
当前值:13,线程名称io-pool-1
结果分析
代码期望的结果是把1到14这15个数据(Flowable.fromIterable(IntStream.range(1, 15).mapToObj(i -> i).collect(Collectors.toList())))并发在4个线程(parallel(8)表示最多有8个线程)上去执行。但是发现真实结果并不是有序的,而是先在0和1线程上执行了1、2,接着在2和3线程上执行了7、8。这种不符合预期的情况是为什么呢?
因为在创建线程池的时候使用的LinkedBlockingDeque。LinkedBlockingDeque队列的执行顺序是这样:
1.如果线程数<=核心线程数,则分配到核心线程数;
2.如果线程数<=核心线程数+队列大小,则不新建线程,在原有线程上轮询执行;
3.如果核心线程数+队列大小<线程数<=核心线程数+队列大小+最大线程数,则会创建新的线程来执行;
4.如果线程数>核心线程数+队列大小+最大线程数,则报错。
通过运行结果来分析可以验证:1,2这两个值在核心线程1,2上执行。然后3-6缓存起来之后队列已满,7和8在新线程2和3上执行。
但是前面说线程数超出之后会报错,为什么测试代码没有报错呢?因为RxJava的Flowable提供背压模式,可以根据下游的处理速度决定发生速度,所以并没有发生报错。
即先缓存满了之后再新建线程,但是在创建线程池的时候理解是创建最大线程数之后还不够再缓存。
原因总结
通过场景复现可以发现,是对线程池的不正确使用,导致可用线程数只为核心线程数,造成大量请求在队列和Flowable里面积压。表现的样子为:系统假死,无法正常处理请求,但是系统的各项指标却是正常的。
改进方案
分为两步方案来解决,第一次采用RxJava自带的线程方法:Schedulers.io();第二种是自己创建缓存线程池。
RxJava自带
RxJava自带的Schedulers.io()线程本质上是一个无边界线程池。可以参考RxJava 内部如何管理线程。但是这种方式有个缺点,没有经过自己的业务封装,不利于业务追踪。例如在监控系统上追踪请求情况。
自己的线程池
为了方便在cat线监控线程执行信息,实现一个和RxJava自带一直的无边界线程池。这种可以根据自己的需求,进行线程池封装。
public class ThreadSchedulerUtils {
private final static ThreadFactory ioFactory = new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build();
private final static ExecutorService ioTreadPool = new ThreadPoolExecutor(8, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), ioFactory, new ThreadPoolExecutor.AbortPolicy());
public static Scheduler getIO() {
return Schedulers.from(ioTreadPool);
}
}
网友评论