美文网首页Android-RxJavaRxJava
RxJava自带线程池监控和管理的探索之旅

RxJava自带线程池监控和管理的探索之旅

作者: 空指针tc | 来源:发表于2019-07-09 20:08 被阅读0次

    背景

    RxJava很方便的切换主子线程、指定任务运行的线程,在这个便利之后还隐藏着很多问题。比如IO scheduler是一个无上限线程池,如果短时间并发量过大,在手机端可能出现OOM或者pthread_create错误。另外,在实际业务中我们需要对执行的业务进行优先级区分,以便优先级高的任务先执行,想实现这个需求必然需要对RxJava默认的scheduler进行改造。本文将从RxJava IO scheduler分析、介绍线程池相关知识、如何对IO scheduler进行改造等方向进行介绍,并且对应用旧代码做到无侵入式的替换。

    线程池相关知识回顾

    1. 线程以及线程池的含义

    在介绍主体内容之前,我们先回顾下线程池的相关知识,这样能更好的理解本文章内容。从字面意思上来说,线程池肯定是一个装着线程的"池",小则是鱼塘,装的少,但是家里没矿只能承包这么大的鱼塘;当然如果是大佬,说不定这一片海都是他的。线程池肯定不是简单的承载容纳线程的池子,既然作为类似仓库的属性,必然有管理之意。

    线程是作为一个任务的执行承载者,接收来自程序的诸多执行请求,其中子线程是合理利用cpu性能避免阻塞主线程的存在。好东西都是不可贪多,线程如果不加以管理,肯定会被程序各处的代码随意创建,这样会浪费或者影响cpu某个时刻的性能,甚至导致当前进程出现异常。

    阻塞队列,这是线程池待执行任务的容器,负责管理要执行的任务。阻塞二字说明它的入队和出队是可控的,所谓阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动被唤醒。按照队列的数据结构进行出入某一个任务时会阻塞,这样在多线程环境下才更安全的生产任务和消费任务。

    比较通俗的一个例子有仓库提取货物,如果仓库里只有有限的几个小车运输货物,此时有很多运输员来提货,肯定要遇到争夺小车、等待小车。小车类似于cpu的核数,也可以理解为线程池允许创建的总数。总而言之,当前仓库(线程池)只有有限的几辆小车同时工作,每个运输员(程序代码块)想要获取货物(执行代码)就必须要争夺小车资源。当有空闲小车时,会按照一定规则分配给某个经销商,可能是队列的简单先后入队等待顺序,也可能是优先级(毕竟氪金无敌VIP)。当然有可能是大佬,家里矿多,说小车不够,我去买,这样会形成一个无限制上限线程池模型。不过这样做有一种风险就是仓库体积无法随意扩容(整个程序所承载的机器性能有限),买了太多小车放不下,然后整个仓库就瘫痪了,这时候可能OOM或pthread_create错误。小车每次使用完后,都会继续被分配到下一个任务,当然如果经销商的事情都处理完了,可能就都闲置了,有可能晚上没活,仓库就把小车都封存起来,整理回收到固定地方(超时闲置后回收非核心线程),有可能留下几辆预先说好的小车以便晚上有紧急货物时处理(核心线程常驻)。
    如下图所示,大致概念如下。绿色的取货车可能是因为取货车不够,临时采购或者借调的,类似于线程池临时开启新线程。红色的取货车区域是核心线程,有限的。

    一般核心线程数根据CPU数量来确定,线程池数大于CPU数量,看似是并发执行任务,其实是操作系统帮我们在按照一定时间片进行调度来执行任务,达到一种同时执行的效果,所以大量线程同时执行对CPU负载性能要求,会让机器达到50%甚至100%高负荷运作,此时整机机器发生出错的概率增大。所以同时执行很多任务导致频繁切换线程本身也是一种额外的开销 ,不建议如此操作,尤其是部分任务是低优先级且不重要、可延迟的。

    单个线程大概1MB左右开销,在Java内部开发版的JDK中,加入fiber这种新的任务调度模型,开销只有200KB左右,通过100W级任务调度测试,据介绍性能比thread优异很多,release版预测还会提高,特性类似于协程。不过我们Android不太可能使用付费JDK。

    线程池模型.png

    通过上面的描述我们简单的了解线程池与线程之间的关系,线程和调用者之间的联系,并且线程池运作时是有自己的规则设定,调用者和每个被管理的线程必须遵守规则。

    2. 线程池的种类

    java通过Executors提供了四种常用线程池,这四种本质上也是封装了自定义线程池的基本参数,简化了创建流程,提供特定的功能。

    先介绍下自定义线程池的基本参数和含义,这样更好理解下面的java封装好的线程池。

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
    

    一个任务被提交时,它的大致流程都是相似的。下面是流程图(下图取自文章https://juejin.im/post/5b052dd7f265da0ba567e7f1

    原生线程池提交流程
    1. CachedThreadPool 缓存线程池
      这是一种线程数无上限的线程池,可以在有空闲线程时复用,无空闲时新建线程。默认情况下,60s回收空闲线程,并且阻塞队列为SynchronousQueue(这是一种无容量阻塞队列,当拿到任务入队时就判断是否有线程可以执行,如果有就立刻出队执行,否则就阻塞等待)
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    1. FixedThreadPool 固定大小线程池
      线程池的线程总数是有上限的,当初始化线程池时可以设置它的容量,如果待执行任务超出总数就需要在队列中等待了。
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    1. SchedulerThreadPool 定时周期任务线程池
      它很多参数都和固定大小线程池一样,除了阻塞队列选用了DelayQueue,这是一种按照延时长短排序的队列。
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
    }
    
    1. SingleThreadExecutor 单线程池
      只有一个线程,所以队列里的任务按照队列的出队规则逐个执行,队列采用的是一个链表结构的队列。
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    

    3.自定义线程池相关类介绍

    一般都是继承ThreadPoolExecutor类(不继承也可以,但是继承是为了做更多的方法执行监控),然后根据需要设置下面7种参数。

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    比较重要的就是阻塞队列workQueue和拒绝策略handler的选择,这个会在后续RxJava的IO scheduler监控方案里再次介绍。java提供的默认队列的种类有无限大小和有限,带优先级、带延时等,根据需要可以选择不同类型的队列。ThreadFactory 是构造新的thread的工厂,这里自定义一个可以进行新建线程的监控。拒绝策略handler有提供四种默认的策略,也可以自己实现接口RejectedExecutionHandler自己做特殊策略,比如移交任务到另外一个执行者,或者判断下这个任务的重要性,然后再抛弃。

    //ThreadPoolExecutor.CallerRunsPolicy:在调用者所在线程执行该任务
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
    //ThreadPoolExecutor.AbortPolicy:放弃执行任务,抛出RejectedExecutionException异常。
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
    
    //ThreadPoolExecutor.DiscardPolicy:放弃执行任务,不抛出异常。
    
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
    
    //ThreadPoolExecutor.DiscardOldestPolicy:poll出队一个最早任务,然后尝试执行它
    
     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
    
    

    IO scheduler的弊端

    RxJavaSchedulersHook类 里会生成IO Scheduler,默认调用CachedThreadScheduler。
    里面的CachedWorkerPool维护了一个线程管理的队列expiringWorkerQueue,
    默认是每隔60s就去通过evictor清除已经过期的线程,线程池没有上限。因此如果短时间内有大量任务要执行,会导致不停地创建新线程,所以存在出现pthread_create、OOM、耗费大量系统资源造成卡顿等问题。

      
    public final class CachedThreadScheduler extends Scheduler implements SchedulerLifecycle {
        private static final long KEEP_ALIVE_TIME;
        private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;
    
        static final ThreadWorker SHUTDOWN_THREADWORKER;
    
        static final CachedWorkerPool NONE;
    
        final ThreadFactory threadFactory;
    
        final AtomicReference<CachedWorkerPool> pool;
    
        static {
            SHUTDOWN_THREADWORKER = new ThreadWorker(RxThreadFactory.NONE);
            SHUTDOWN_THREADWORKER.unsubscribe();
    
            NONE = new CachedWorkerPool(null, 0, null);
            NONE.shutdown();
    
            KEEP_ALIVE_TIME = Integer.getInteger("rx.io-scheduler.keepalive", 60);
        }
    
        static final class CachedWorkerPool {
            private final ThreadFactory threadFactory;
            private final long keepAliveTime;
            private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
            private final CompositeSubscription allWorkers;
            private final ScheduledExecutorService evictorService;
            private final Future<?> evictorTask;
    
            CachedWorkerPool(final ThreadFactory threadFactory, long keepAliveTime, TimeUnit unit) {
                this.threadFactory = threadFactory;
                this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
                this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
                this.allWorkers = new CompositeSubscription();
    
                ScheduledExecutorService evictor = null;
                Future<?> task = null;
                if (unit != null) {
                    evictor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
                        @Override public Thread newThread(Runnable r) {
                            Thread thread = threadFactory.newThread(r);
                            thread.setName(thread.getName() + " (Evictor)");
                            return thread;
                        }
                    });
                    NewThreadWorker.tryEnableCancelPolicy(evictor);
                    task = evictor.scheduleWithFixedDelay(
                            new Runnable() {
                                @Override
                                public void run() {
                                    evictExpiredWorkers();
                                }
                            }, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
                    );
                }
                evictorService = evictor;
                evictorTask = task;
            }
    
            ThreadWorker get() {
                if (allWorkers.isUnsubscribed()) {
                    return SHUTDOWN_THREADWORKER;
                }
                while (!expiringWorkerQueue.isEmpty()) {
                    ThreadWorker threadWorker = expiringWorkerQueue.poll();
                    if (threadWorker != null) {
                        return threadWorker;
                    }
                }
    
                // No cached worker found, so create a new one.
                ThreadWorker w = new ThreadWorker(threadFactory);
                allWorkers.add(w);
                return w;
            }
    
            void release(ThreadWorker threadWorker) {
                // Refresh expire time before putting worker back in pool
                threadWorker.setExpirationTime(now() + keepAliveTime);
    
                expiringWorkerQueue.offer(threadWorker);
            }
            //每60s执行一次清除队列中的已过时线程
            void evictExpiredWorkers() {
                if (!expiringWorkerQueue.isEmpty()) {
                    long currentTimestamp = now();
    
                    for (ThreadWorker threadWorker : expiringWorkerQueue) {
                        if (threadWorker.getExpirationTime() <= currentTimestamp) {
                            if (expiringWorkerQueue.remove(threadWorker)) {
                                allWorkers.remove(threadWorker);
                            }
                        } else {
                            // Queue is ordered with the worker that will expire first in the beginning, so when we
                            // find a non-expired worker we can stop evicting.
                            break;
                        }
                    }
                }
            }
    
            long now() {
                return System.nanoTime();
            }
    
            void shutdown() {
                try {
                    if (evictorTask != null) {
                        evictorTask.cancel(true);
                    }
                    if (evictorService != null) {
                        evictorService.shutdownNow();
                    }
                } finally {
                    allWorkers.unsubscribe();
                }
            }
        }
    
        public CachedThreadScheduler(ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
            this.pool = new AtomicReference<CachedWorkerPool>(NONE);
            start();
        }
    
        @Override
        public void start() {
            CachedWorkerPool update =
                new CachedWorkerPool(threadFactory, KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
            if (!pool.compareAndSet(NONE, update)) {
                update.shutdown();
            }
        }
        @Override
        public void shutdown() {
            for (;;) {
                CachedWorkerPool curr = pool.get();
                if (curr == NONE) {
                    return;
                }
                if (pool.compareAndSet(curr, NONE)) {
                    curr.shutdown();
                    return;
                }
            }
        }
    
        @Override
        public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
    
        static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
            private final CompositeSubscription innerSubscription = new CompositeSubscription();
            private final CachedWorkerPool pool;
            private final ThreadWorker threadWorker;
            final AtomicBoolean once;
    
            EventLoopWorker(CachedWorkerPool pool) {
                this.pool = pool;
                this.once = new AtomicBoolean();
                this.threadWorker = pool.get();
            }
    
            @Override
            public void unsubscribe() {
                if (once.compareAndSet(false, true)) {
                    // unsubscribe should be idempotent, so only do this once
    
                    // Release the worker _after_ the previous action (if any) has completed
                    threadWorker.schedule(this);
                }
                innerSubscription.unsubscribe();
            }
    
            @Override
            public void call() {
                pool.release(threadWorker);
            }
    
            @Override
            public boolean isUnsubscribed() {
                return innerSubscription.isUnsubscribed();
            }
    
            @Override
            public Subscription schedule(Action0 action) {
                return schedule(action, 0, null);
            }
    
            @Override
            public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
                if (innerSubscription.isUnsubscribed()) {
                    // don't schedule, we are unsubscribed
                    return Subscriptions.unsubscribed();
                }
    
                ScheduledAction s = threadWorker.scheduleActual(new Action0() {
                    @Override
                    public void call() {
                        if (isUnsubscribed()) {
                            return;
                        }
                        action.call();
                    }
                }, delayTime, unit);
                innerSubscription.add(s);
                s.addParent(innerSubscription);
                return s;
            }
        }
    
        static final class ThreadWorker extends NewThreadWorker {
            private long expirationTime;
    
            ThreadWorker(ThreadFactory threadFactory) {
                super(threadFactory);
                this.expirationTime = 0L;
            }
    
            public long getExpirationTime() {
                return expirationTime;
            }
    
            public void setExpirationTime(long expirationTime) {
                this.expirationTime = expirationTime;
            }
        }
    }
    
    

    在NewThreadWorker里最终设置executor的初始化构造,这里可以看到是一个定时周期任务线程池,核心线程为1.

       /* package */
        public NewThreadWorker(ThreadFactory threadFactory) {
            ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
            // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
            boolean cancelSupported = tryEnableCancelPolicy(exec);
            if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
                registerExecutor((ScheduledThreadPoolExecutor)exec);
            }
            executor = exec;
        }
     public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
            Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
            ScheduledAction run = new ScheduledAction(decoratedAction);
            Future<?> f;
            if (delayTime <= 0) {
                f = executor.submit(run);
            } else {
                f = executor.schedule(run, delayTime, unit);
            }
            run.add(f);
    
            return run;
        }
    
    
        public ScheduledThreadPoolExecutor(int corePoolSize,
                                           ThreadFactory threadFactory) {
            super(corePoolSize, Integer.MAX_VALUE,
                  DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                  new DelayedWorkQueue(), threadFactory);
        }
    

    技术方案分析

    本线程管理方案特性

    1. 几乎0侵入代码,只需要在Application初始化。如果要使用优先级,则需要对改变优先级的任务的代码块传入参数,属于正常侵入。
    2. 线程创建、执行的流程优化,更适合应用APP利用线程池执行任务
    3. 线程的执行代码块定位和耗时统计等基本信息收集,以及以此为基础的任务追踪频度,比如某个阶段的线程开启情况和任务耗时,任务执行频度统计。
    4. 避免pthread_create、Rxjava IO scheduler线程开启过多导致的OOM等问题
    5. 任务优先级执行,可在性能不足时丢弃执行可抛弃类型的任务
    6. 提供当前应用总线程数和其他线程名字的打印输出
    7. 可监控子线程中再次切换线程的情况,避免多余线程开销

    本方案提供一些自定义的基础类,也可以选择实现接口,自由替换为自己想要的策略。

    方案分析

    主要是利用RxJava本身RxJavaPlugins做替换线程池的操作。然后自定义一些scheduler所需要的类,完成相关逻辑。

    RxJavaPlugins.getInstance().registerSchedulersHook(new RxJavaSchedulersHookImpl());
    

    管理逻辑类:IOTaskPriorityType、IOMonitorManager、AppSchedulers
    IOMonitorManager:负责调用基础监控类,并且替换默认RxJava的IO scheduler,作为主要的逻辑管理类,负责初始化整个监控模块,以及配置基本参数。

    AppSchedulers作为替换Schedulers的存在,用于应用层显式地使用自定义IO Scheduler,区别于原来的RxJava的Sheduler.IO。

    IOTaskPriorityType:定义不同的任务类型,用于区分管理。

    大致结构如下图所示。

    IO scheduler线程监控结构图.png

    改造后的任务执行流程也做了相应的改变,有别于上面提到的原生流程。对什么时候创建线程做了调整,优先在限制线程池大小范围内创建新线程执行任务,而不是以前的无限创建或者先阻塞队列等待执行。为了方便对比,把上面的图片也拷贝下来。

    线程池替换后的任务执行流程.png 原生线程池提交流程

    使用说明

    关于custominterface包:自定义IO scheduler或者scheduler所使用的线程池时,需要关注这个包下面的接口和抽象类
    customScheduler包:已经自定义好的scheduler相关以及提供的基础线程池,可以参考这里的实现,去自定义应用自己的线程池管理的scheduler

    大部分时候你只需要关心IOMonitorManager这个入口管理类,其它只在需要自定义或者策略改动时才修改。

    /*
     * description  自定义IO线程监控管理类,配置监视器的基本参数,开启各种debug方法等
     * 监控要求:
     * 1、自定义的scheduler需要继承AbstractScheduler
     * 2、参考或者直接使用ExecutorSchedulerWorker创建任务
     * 3、如果是要替换原始RxJava的IO线程池,需要额外实现IThreadPool,创建自己的线程池类
     * 4、IThreadPool的实现类里,线程池的构造使用MonitorThreadPoolExecutor类,便于监控
     * 5、编写新的scheduler参考自定义的IOScheduler类
     * 6、如果需要自定义SchedulerWork,需要实现Runnable, IBaseWork 接口,继承Scheduler.Worker
     * 7、默认提供IOThreadPool和LimitCoreThreadPool两种基础线程池,还有自定义的IOScheduler(用来替换原本RxJava的IOScheduler)
     * 8、IOTaskPriorityType优先级类型,RxJava observable在subscribeOn时可以选择传入
     * 9、AbstractRejectedExecutionHandler可以做一些拒绝任务的策略动作
     *
     * @see ExecutorSchedulerWorker
     * 使用方式:
     * 1、所有public方法提供配置参数
     * 2、在基础参数配置完后调用此方法startMonitor
     * modify by
     */
    
    
       //必须在应用第一次使用observable之前设置,这里会替换rxjava的默认IO scheduler。
                    // 如果只调用setReplaceIOScheduler方法,则替换时用基础库里自带的自定义IO scheduler
                    //LimitCoreThreadPool不是基础库默认的IO scheduler实现,一般都是替换线程池实现,不直接修改自定义的IO scheduler
                    IOMonitorManager.getInstance().setReplaceIOScheduler(true)
                            .setIOThreadPool(LimitCoreThreadPool.getInstance()
                                    .build(2, BuildConfig.DEBUG ? 35 : 35, 15, 1000, false));
    
                    List<String> targetList = new ArrayList<>(2);
                    targetList.add("com.xtc");
                    //配置基本的监视器参数,下面参数可以在IOMonitorManager的startMonitor后修改
                    IOMonitorManager.getInstance().setCostTimeLimit(0)
                            //超出这个活跃线程数就输出到日志
                            .setThreadActiveCountLimit(30)
                            //打印当前被监控的线程池信息
                            .setLogThreadPoolInfo(BuildConfig.DEBUG)
                            //打印其它非RxJava的IO线程信息
                            .setLogOtherThreadInfo(true)
                            .setPackageName(getPackageName())
    //                        .setTargetNameList(list)
                            //监控器轮询时间,每隔这么久打印一些线程信息
                            .setMonitorIntervalTime(10)
                            //是否输出更多日志信息,看方法注释
                            .setLogMoreInfo(BuildConfig.DEBUG)
                            //监控器是否启用
                            .setMonitorEnable(BuildConfig.DEBUG)
                            //一般在调试时才开启,监控子线程重复切换线程
                            .setLogRepeatChangeThread(false)
                            //打印所有任务执行情况,适合打桩分析当前时间段有哪些任务触发
                            .setLogAllTaskRunningInfo(true)
                            //是否过滤堆栈
                            .setFilterStack(!BuildConfig.DEBUG);
    
    

    源码

    下载地址

    相关文章

      网友评论

        本文标题:RxJava自带线程池监控和管理的探索之旅

        本文链接:https://www.haomeiwen.com/subject/yfyenqtx.html