背景
最近在做Android
应用线程优化,其中有一个核心指标就是收敛进程中的线程数,这是一段很长的故事,本文只是关于RxJava
的一个方面的优化,其中有些坑值得每位使用RxJava
的筒子注意。背景是这样的,我们APP在进入之后,通过一些正常业务流程的使用,稳定之后,通过Android Profiler
发现有一类RxComputationScheduler-
的线程,数量为8个(我的测试机有8个计算核心)。
通过直接搜索关键词,RxComputationScheduler-
很容易就定位到了它是RxJava默认提供的computation
调度器产生的线程。我印象里面,我们很少使用computation
调度器。这8个线程几乎没有任何负载,也就说它们虽然存在,却一直在睡觉😂,总所周知,Java的线程模型和系统线程模型是1:1映射关系,所以这些睡大觉的家伙是我这次要干掉的!
别的不说,至少Rx在使用线程的时候,还是挺规范的,它会给自己使用的线程命名,这样在进行线程调试的时候,我们能找到对应的线程调起方,至于反面呢?我直接上图吧
image.png
这些线程直接使用Executors.defaultThreadFactory 为线程池指定ThreadFactory,这样的后果就是我们这里看到了一堆pool-${线程池编号}-thread-${线程编号}
的僵尸线程,试问如果我们要去进行线程优化、锁排查,怎么去定位问题?
PS:就算他们没有指定线程名字,也难不住聪明又伶俐的我,后面我会介绍一种定位这种僵尸线程的方法 😊
调度器之殇
既然是computation
调度器产生的僵尸线程,那么关于computation
调度器,看名字都知道它其实是Rx提供给开发者进行CPU密集型任务的调度器,为什么这么说?因为computation
调度器内部最多只会创建当前设备的计算核心个数的线程(注意,它不是采用线程池来实现的)。
CPU密集型任务是和IO密集型任务对应的,所谓CPU密集型,指的是任务是大规模的计算工作,会一直占用CPU,所以对于这类任务,线程数超过计算核心没有任何意义,因为他们很少会把线程挂起,增加线程只会导致线程直接争抢时间片和上下文切换带来的开销,所以一般来说,CPU密集型任务设计的线程池中线程个数都需要严格限制(常用计算核心数)
IO密集型任务,是我们最常见的,比如发送个网络请求,比如读写个文件,这类任务的突出特征就是对于CPU占用少,一般都会阻塞在IO设备上面,所以对于这些任务,通常我们会设置比较大的线程数量,因为反正它们执行期间大部分时间都是在睡觉,那么更多的线程可以提高系统的吞吐量。
一些语言中常见的协程,其实就是为了解决我们创建过多线程,然后其实对于CPU使用时间很短,很多线程在占用系统资源,所以在语言层面提供一种新思维,不去阻塞系统线程,在一个线程上面处理多个IO任务;
既然如此,看起来就是业务中使用了computation
调度器,导致系统中产生了8个计算线程,那它们为什么不会被回收呢?这就需要看一下代码了,computation
默认基于EventLoopsScheduler
来实现的,它内部使用自定义的一个类来做线程管理:
static final class FixedSchedulerPool {
// 默认可计算核心数量
final int cores;
//poolworker就是一个NewThreadWorker,直接通过一个线程数组来管理线程
final PoolWorker[] eventLoops;
long n;
FixedSchedulerPool(ThreadFactory threadFactory, int maxThreads) {
// initialize event loops
this.cores = maxThreads;
this.eventLoops = new PoolWorker[maxThreads];
for (int i = 0; i < maxThreads; i++) {
//直接一上来就初始化数组,生成各个NewThreadWorker
this.eventLoops[i] = new PoolWorker(threadFactory);
}
}
//获取Worker直接是内部计数器 和 cores取余保证任务在各个Worker来回分配
public PoolWorker getEventLoop() {
int c = cores;
if (c == 0) {
return SHUTDOWN_WORKER;
}
// simple round robin, improvements to come
return eventLoops[(int)(n++ % c)];
}
}
EventLoopsScheduler
创建Worker就简单了,直接从上面的数据结构中取出一个PoolWorker即可,然后给EventLoopWorker
包装一下:
public Worker createWorker() {
return new EventLoopWorker(pool.get().getEventLoop());
}
我前面说过了,PoolWorker只是一个普通的NewThreadWorker,所以这个EventLoopWorker
的包装肯定做了什么不可告人的秘密:
private static class EventLoopWorker extends Scheduler.Worker {
public void unsubscribe() {
both.unsubscribe();
}
@Override
public Subscription schedule(final Action0 action) {
return poolWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
action.call();
}
}, 0, null, serial);
}
}
ok,如你所见,这个EventLoopWorker好像啥都没干呀,只是把任务代理给了之前传进来的Worker,然而你在仔细看看它的unsubscribe
方法,调用了both的反注册,而这个both仅仅是每次Worker. schedule任务的Subscription,它并没有去调用Worker的unsubscribe
(Super),那Super中做了什么呢?NewThreadWorker:
@Override
public void unsubscribe() {
isUnsubscribed = true;
executor.shutdownNow();
deregisterExecutor(executor);
}
所以结论就是EventLoopWorker把unsubscribe的线程关闭代码给去掉了,😂
直接上结论吧:computation
调度器在使用过程中会创建线程核心数个数的线程,然后这些线程会一直存活。
因为RxJava并不是一个针对移动端设计的框架,所以在服务端来说,通常准备8个左右线程进行计算工作没有问题,然而客户端上业务进行纯计算的任务实在是太少了,而且不会存在很高的并发度,所以浪费八个线程一直在这里睡觉,感觉不太合适,怎么破?
所以在移动端来说,我觉得通过直接创建线程来处理计算任务是合适的,处理完,直接释放。
RxJavaPlugins.getInstance().registerSchedulersHook(new RxJavaSchedulersHook() {
@Override
public Scheduler getIOScheduler() {
return new CachedThreadScheduler(new MYRxThreadFactory("MYRxIoScheduler-"));
}
@Override
public Scheduler getComputationScheduler() {
return createNewThreadScheduler(new RxThreadFactory("RxCom"));
}
});
这里这么成NewThread其实有问题,后面会解释,这样方便现在调试和定位问题
所以我通过RxJavaPlugins中修改computation
默认的行为,改成每次都创建线程(名称为RxCom),这次修复之后,满心欢喜,build,run,打开Android Profiler:
还是有五位大爷稳坐钓鱼台,然后:
image.png
RxBus惹的祸
看到上面还存在5个线程,我内心很崩溃了,不是都NewThreadWorder了么,怎么还没有被回收?看起来我们得找到这5个线程是哪些地方打开的了!这里我取了点巧,使用了一个hook库:
Epic,它基于Xposed,用来Hook自己进程,所以我的思路也很清楚,HOOK开启线程的代码,加入日志,存储对应线程的名称,然后不就找到罪魁祸首了么?
so:
这样就hook住每次打开创建线程的方法(线程的构造函数)了,在hook方法里面:
image.png
通过存储线程名和当前堆栈,然后在run起来吧~
然后在Android Profiler中找到对应的线程名,它不就是我这个Map里面的Key吗?
这样我就拿到了宝贵的启动堆栈:
java.lang.Throwable
at com.sankuai.movie.ThreadMethodHook.afterHookedMethod(ThreadMethodHook.java:29)
at com.taobao.android.dexposed.DexposedBridge.handleHookedArtMethod(DexposedBridge.java:273)
at me.weishu.epic.art.entry.Entry.onHookObject(Entry.java:69)
at me.weishu.epic.art.entry.Entry.referenceBridge(Entry.java:186)
at rx.internal.util.RxThreadFactory.newThread(RxThreadFactory.java:39)
at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:631)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:945)
at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1611)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:342)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:664)
at rx.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:240)
at rx.internal.schedulers.NewThreadWorker.schedule(NewThreadWorker.java:224)
at rx.internal.schedulers.NewThreadWorker.schedule(NewThreadWorker.java:216)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.schedule(OperatorObserveOn.java:190)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$1.request(OperatorObserveOn.java:147)
at rx.Subscriber.setProducer(Subscriber.java:209)
at rx.Subscriber.setProducer(Subscriber.java:205)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.init(OperatorObserveOn.java:141)
at rx.internal.operators.OperatorObserveOn.call(OperatorObserveOn.java:75)
at rx.internal.operators.OperatorObserveOn.call(OperatorObserveOn.java:40)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:46)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.subscribe(Observable.java:8759)
at rx.Observable.subscribe(Observable.java:8726)
at rx.Observable.subscribe(Observable.java:8581)
at com.dianping.nvnetwork.tunnel2.ConnectionPoolManager.<init>(ConnectionPoolManager.java:105)
at com.dianping.nvnetwork.tunnel2.NIOTunnel.<init>(NIOTunnel.java:58)
at com.dianping.nvnetwork.tunnel2.RxAndroidNIOTunnelService.<init>(RxAndroidNIOTunnelService.java:51)
忽略N行
PS:还记得之前我说过,有一堆僵尸线程没有命名怎么查找么,你GET到方法了吗?😝
我们发现是某个Manager代码启动的这个线程,根据Log点进去看看:
看起来没有任何毛病呀,一个RxBus订阅,结果切换到
computation
线程,然后计算工作,不过这里看着隐隐有点担心,总所周知,时间总线的订阅是没有结束时候的,所以这个流一直在注册中,现在的问题就简单了,observeOn到底拿这个computation
调度器做了什么导致它不能回收了呢?
这里又涉及到RxJava关于lift和OperatorObserveOn两个类的讲解,但是由于篇幅原因,我这里不去展开说明了,observeOn中会依据给的Scheduler,create一个Worker,然后在流完成之后,会被反注册;因为我目前指定的是NewThread调度器,Worker和Thread一一对应,Thread存活也就不意外啦!
解决方案
本来我是想把computation
调度器直接替换成newThread
,然而我看到下图的时候还是有些震惊的,就一个启动,computation
就给我霍霍了66个线程,所以我们还是得考虑线程复用的问题;
直接上最后的方案吧:
image.png
核心是我们需要将核心线程数也可以超时,干掉!
通过Android Profiler再次打开,我们可以看到在最开始的时候,有一些
computation
调度器的线程存活,数量是8个,过一段时间之后,这些线程就自己销毁啦!
至此,我们项目中又被干掉了8个僵尸线程!
网友评论