RxJava优化之干掉僵尸线程

作者: 楚云之南 | 来源:发表于2019-02-25 15:27 被阅读76次

    背景

    最近在做Android应用线程优化,其中有一个核心指标就是收敛进程中的线程数,这是一段很长的故事,本文只是关于RxJava的一个方面的优化,其中有些坑值得每位使用RxJava的筒子注意。背景是这样的,我们APP在进入之后,通过一些正常业务流程的使用,稳定之后,通过Android Profiler发现有一类RxComputationScheduler-的线程,数量为8个(我的测试机有8个计算核心)。

    image.png

    通过直接搜索关键词,RxComputationScheduler-很容易就定位到了它是RxJava默认提供的computation调度器产生的线程。我印象里面,我们很少使用computation调度器。这8个线程几乎没有任何负载,也就说它们虽然存在,却一直在睡觉😂,总所周知,Java的线程模型和系统线程模型是1:1映射关系,所以这些睡大觉的家伙是我这次要干掉的!

    别的不说,至少Rx在使用线程的时候,还是挺规范的,它会给自己使用的线程命名,这样在进行线程调试的时候,我们能找到对应的线程调起方,至于反面呢?我直接上图吧


    image.png

    这些线程直接使用Executors.defaultThreadFactory 为线程池指定ThreadFactory,这样的后果就是我们这里看到了一堆pool-${线程池编号}-thread-${线程编号}的僵尸线程,试问如果我们要去进行线程优化、锁排查,怎么去定位问题?

    image.png

    PS:就算他们没有指定线程名字,也难不住聪明又伶俐的我,后面我会介绍一种定位这种僵尸线程的方法 😊

    调度器之殇

    既然是computation调度器产生的僵尸线程,那么关于computation调度器,看名字都知道它其实是Rx提供给开发者进行CPU密集型任务的调度器,为什么这么说?因为computation调度器内部最多只会创建当前设备的计算核心个数的线程(注意,它不是采用线程池来实现的)。

    CPU密集型任务是和IO密集型任务对应的,所谓CPU密集型,指的是任务是大规模的计算工作,会一直占用CPU,所以对于这类任务,线程数超过计算核心没有任何意义,因为他们很少会把线程挂起,增加线程只会导致线程直接争抢时间片和上下文切换带来的开销,所以一般来说,CPU密集型任务设计的线程池中线程个数都需要严格限制(常用计算核心数)
    IO密集型任务,是我们最常见的,比如发送个网络请求,比如读写个文件,这类任务的突出特征就是对于CPU占用少,一般都会阻塞在IO设备上面,所以对于这些任务,通常我们会设置比较大的线程数量,因为反正它们执行期间大部分时间都是在睡觉,那么更多的线程可以提高系统的吞吐量。
    一些语言中常见的协程,其实就是为了解决我们创建过多线程,然后其实对于CPU使用时间很短,很多线程在占用系统资源,所以在语言层面提供一种新思维,不去阻塞系统线程,在一个线程上面处理多个IO任务;

    既然如此,看起来就是业务中使用了computation调度器,导致系统中产生了8个计算线程,那它们为什么不会被回收呢?这就需要看一下代码了,computation默认基于EventLoopsScheduler来实现的,它内部使用自定义的一个类来做线程管理:

    static final class FixedSchedulerPool {
       // 默认可计算核心数量
        final int cores; 
      //poolworker就是一个NewThreadWorker,直接通过一个线程数组来管理线程
        final PoolWorker[] eventLoops; 
        long n;
    
        FixedSchedulerPool(ThreadFactory threadFactory, int maxThreads) {
            // initialize event loops
            this.cores = maxThreads;
            this.eventLoops = new PoolWorker[maxThreads];
            for (int i = 0; i < maxThreads; i++) {
                //直接一上来就初始化数组,生成各个NewThreadWorker
                this.eventLoops[i] = new PoolWorker(threadFactory);
            }
        }
    
        //获取Worker直接是内部计数器 和 cores取余保证任务在各个Worker来回分配
        public PoolWorker getEventLoop() {
            int c = cores;
            if (c == 0) {
                return SHUTDOWN_WORKER;
            }
            // simple round robin, improvements to come
            return eventLoops[(int)(n++ % c)];
        } 
    }
    

    EventLoopsScheduler创建Worker就简单了,直接从上面的数据结构中取出一个PoolWorker即可,然后给EventLoopWorker包装一下:
    public Worker createWorker() {
    return new EventLoopWorker(pool.get().getEventLoop());
    }

    我前面说过了,PoolWorker只是一个普通的NewThreadWorker,所以这个EventLoopWorker的包装肯定做了什么不可告人的秘密:

     private static class EventLoopWorker extends Scheduler.Worker {
      
        public void unsubscribe() {
            both.unsubscribe();
        }
    
         @Override
        public Subscription schedule(final Action0 action) {
            return poolWorker.scheduleActual(new Action0() {
                @Override
                public void call() {
                    if (isUnsubscribed()) {
                        return;
                    }
                    action.call();
                }
            }, 0, null, serial);
        }
    }
    

    ok,如你所见,这个EventLoopWorker好像啥都没干呀,只是把任务代理给了之前传进来的Worker,然而你在仔细看看它的unsubscribe方法,调用了both的反注册,而这个both仅仅是每次Worker. schedule任务的Subscription,它并没有去调用Worker的unsubscribe(Super),那Super中做了什么呢?NewThreadWorker:

     @Override
    public void unsubscribe() {
        isUnsubscribed = true;
        executor.shutdownNow();
        deregisterExecutor(executor);
    }
    

    所以结论就是EventLoopWorker把unsubscribe的线程关闭代码给去掉了,😂

    直接上结论吧:computation调度器在使用过程中会创建线程核心数个数的线程,然后这些线程会一直存活。

    因为RxJava并不是一个针对移动端设计的框架,所以在服务端来说,通常准备8个左右线程进行计算工作没有问题,然而客户端上业务进行纯计算的任务实在是太少了,而且不会存在很高的并发度,所以浪费八个线程一直在这里睡觉,感觉不太合适,怎么破?

    所以在移动端来说,我觉得通过直接创建线程来处理计算任务是合适的,处理完,直接释放

     RxJavaPlugins.getInstance().registerSchedulersHook(new RxJavaSchedulersHook() {
            @Override
            public Scheduler getIOScheduler() {
                return new CachedThreadScheduler(new MYRxThreadFactory("MYRxIoScheduler-"));
            }
    
            @Override
            public Scheduler getComputationScheduler() {
                return  createNewThreadScheduler(new RxThreadFactory("RxCom"));
            }
        });
    

    这里这么成NewThread其实有问题,后面会解释,这样方便现在调试和定位问题

    所以我通过RxJavaPlugins中修改computation默认的行为,改成每次都创建线程(名称为RxCom),这次修复之后,满心欢喜,build,run,打开Android Profiler:

    image.png

    还是有五位大爷稳坐钓鱼台,然后:


    image.png

    RxBus惹的祸

    看到上面还存在5个线程,我内心很崩溃了,不是都NewThreadWorder了么,怎么还没有被回收?看起来我们得找到这5个线程是哪些地方打开的了!这里我取了点巧,使用了一个hook库:
    Epic,它基于Xposed,用来Hook自己进程,所以我的思路也很清楚,HOOK开启线程的代码,加入日志,存储对应线程的名称,然后不就找到罪魁祸首了么?
    so:

    image.png

    这样就hook住每次打开创建线程的方法(线程的构造函数)了,在hook方法里面:


    image.png

    通过存储线程名和当前堆栈,然后在run起来吧~
    然后在Android Profiler中找到对应的线程名,它不就是我这个Map里面的Key吗?

    image.png

    这样我就拿到了宝贵的启动堆栈:

    java.lang.Throwable
    at com.sankuai.movie.ThreadMethodHook.afterHookedMethod(ThreadMethodHook.java:29)
    at com.taobao.android.dexposed.DexposedBridge.handleHookedArtMethod(DexposedBridge.java:273)
    at me.weishu.epic.art.entry.Entry.onHookObject(Entry.java:69)
    at me.weishu.epic.art.entry.Entry.referenceBridge(Entry.java:186)
    at rx.internal.util.RxThreadFactory.newThread(RxThreadFactory.java:39)
    at java.util.concurrent.ThreadPoolExecutor$Worker.<init>(ThreadPoolExecutor.java:631)
    at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:945)
    at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1611)
    at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:342)
    at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:562)
    at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:664)
    at rx.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:240)
    at rx.internal.schedulers.NewThreadWorker.schedule(NewThreadWorker.java:224)
    at rx.internal.schedulers.NewThreadWorker.schedule(NewThreadWorker.java:216)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.schedule(OperatorObserveOn.java:190)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$1.request(OperatorObserveOn.java:147)
    at rx.Subscriber.setProducer(Subscriber.java:209)
    at rx.Subscriber.setProducer(Subscriber.java:205)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.init(OperatorObserveOn.java:141)
    at rx.internal.operators.OperatorObserveOn.call(OperatorObserveOn.java:75)
    at rx.internal.operators.OperatorObserveOn.call(OperatorObserveOn.java:40)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:46)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
    at rx.Observable.subscribe(Observable.java:8759)
    at rx.Observable.subscribe(Observable.java:8726)
    at rx.Observable.subscribe(Observable.java:8581)
    at com.dianping.nvnetwork.tunnel2.ConnectionPoolManager.<init>(ConnectionPoolManager.java:105)
    at com.dianping.nvnetwork.tunnel2.NIOTunnel.<init>(NIOTunnel.java:58)
    at com.dianping.nvnetwork.tunnel2.RxAndroidNIOTunnelService.<init>(RxAndroidNIOTunnelService.java:51)
    忽略N行
    

    PS:还记得之前我说过,有一堆僵尸线程没有命名怎么查找么,你GET到方法了吗?😝

    我们发现是某个Manager代码启动的这个线程,根据Log点进去看看:

    image.png
    看起来没有任何毛病呀,一个RxBus订阅,结果切换到computation线程,然后计算工作,不过这里看着隐隐有点担心,总所周知,时间总线的订阅是没有结束时候的,所以这个流一直在注册中,现在的问题就简单了,observeOn到底拿这个computation调度器做了什么导致它不能回收了呢?

    这里又涉及到RxJava关于lift和OperatorObserveOn两个类的讲解,但是由于篇幅原因,我这里不去展开说明了,observeOn中会依据给的Scheduler,create一个Worker,然后在流完成之后,会被反注册;因为我目前指定的是NewThread调度器,Worker和Thread一一对应,Thread存活也就不意外啦!

    解决方案

    本来我是想把computation调度器直接替换成newThread ,然而我看到下图的时候还是有些震惊的,就一个启动,computation就给我霍霍了66个线程,所以我们还是得考虑线程复用的问题;

    image.png
    直接上最后的方案吧:
    image.png
    核心是我们需要将核心线程数也可以超时,干掉!
    通过Android Profiler再次打开,我们可以看到在最开始的时候,有一些computation调度器的线程存活,数量是8个,过一段时间之后,这些线程就自己销毁啦!

    至此,我们项目中又被干掉了8个僵尸线程!

    相关文章

      网友评论

        本文标题:RxJava优化之干掉僵尸线程

        本文链接:https://www.haomeiwen.com/subject/ywubyqtx.html