美文网首页java进阶
Rxjava原理(二)--线程调度

Rxjava原理(二)--线程调度

作者: Jack_Ou | 来源:发表于2021-01-20 09:14 被阅读0次

    1. 创建线程池和线程管理策略分析

    // 在开发中使用Rxjava来完成线程切换会调用到以下方法(还有几个就不一一列举了,原理一样的),那么就从这里开始分析
    Schedulers.io()
    Schedulers.computation()
    Schedulers.newThread()
    AndroidSchedulers.mainThread()
    

    当我们调用以上方法中的任意一个,都会调到Schedulers类中,Schedulers使用策略模式封装了所有线程切换策略(因此后面以io()分析)。

    // 1. Schedulers类中,静态创建IOTask(),当调用Schedulers.io()的时候,就是返回这个Callable.
    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
    
        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    
        IO = RxJavaPlugins.initIoScheduler(new IOTask());
    
        TRAMPOLINE = TrampolineScheduler.instance();
    
        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }
    
    // 2.创建IoScheduler
    static final class IOTask implements Callable<Scheduler> {
            @Override
            public Scheduler call() throws Exception {
                return IoHolder.DEFAULT;
            }
        }
    
    static final class IoHolder {
            static final Scheduler DEFAULT = new IoScheduler();
        }
    
    // 3.创建线程池
    public IoScheduler(ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
            this.pool = new AtomicReference<CachedWorkerPool>(NONE);
            start();
        }
    public void start() {
            // CachedWorkerPool任务池,里面持有任务队列和线程池
            CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
            if (!pool.compareAndSet(NONE, update)) {
                update.shutdown();
            }
        }
    
    //4. CachedWorkerPool构造方法中创建线程池,并且暴露get()提供需要执行的任务
    static final class CachedWorkerPool implements Runnable {
            private final long keepAliveTime;
            private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
            final CompositeDisposable allWorkers;
            private final ScheduledExecutorService evictorService;
            private final Future<?> evictorTask;
            private final ThreadFactory threadFactory;
    
           CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
                ......
                if (unit != null) {
                    // 创建线程池
                    evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                    task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
                }
                ......
            }
        
            ThreadWorker get() {
                .....
                while (!expiringWorkerQueue.isEmpty()) {
                    // 任务队列不为空,从队列中取一个并返回
                    ThreadWorker threadWorker = expiringWorkerQueue.poll();
                    if (threadWorker != null) {
                        return threadWorker;
                    }
                }
    
                // 如果任务队列是空的,就创建一个并返回
                ThreadWorker w = new ThreadWorker(threadFactory);
                allWorkers.add(w);
                return w;
            }
        ......
    }
    

    用一张图可能说明得比较清楚一些。

    Schedulers调度过程.png

    2. Rxjava上游任务在子线程中执行分析

    // 上游线程切换使用过程
    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("JackOu");
        }
    })
      // ObservableCreate.subscribeOn
      .subscribeOn(Schedulers.io())
      // ObservableSubscribeOn.subscribe
      .subscribe(new Observer<String>() {
          ......
    
          @Override
          public void onNext(String s) {
    
          }
            ......
      });
    

    从上面使用过程的代码看下面的图,分析Rxjava封装任务和抛任务到线程池的过程。

    上游任务在线程池执行流程图.png

    当我们一订阅(调用subscribe(Observer)方法)的时候,Rxjava将会把上游需要执行的任务和下游的观察者经过层层包裹,包裹好之后,就会得到一个Scheduler.Worker任务对象。当调用发射器的onNext的方式的时候,结合第一小节的图片,ObservableSubscribeOn就会将任务抛到线程池执行,在子线程中执行任务并且返回,从而完成线程切换功能。

    3. Rxjava下游任务在主线程中执行分析

    3.1 创建AndroidSchedulers.mainThread的过程

    如第一节Schedulers的创建流程一样,当调用AndroidSchedulers.mainThread()之后,最终会创建HandlerScheduler。

    // 1.创建HandlerScheduler,并且传入MainLooper
    public final class AndroidSchedulers {
    
        private static final class MainHolder {
            // 创建HandlerScheduler
            static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
        }
    
        private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
                new Callable<Scheduler>() {
                    @Override public Scheduler call() throws Exception {
                        return MainHolder.DEFAULT;
                    }
                });
    
        public static Scheduler mainThread() {
            return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
        }
    }
    
    // 2.当创建任务的时候,创建HandlerWorker
    final class HandlerScheduler extends Scheduler {
        private final Handler handler;
    
        HandlerScheduler(Handler handler) {
            this.handler = handler;
        }
    
        @Override
        public Worker createWorker() {
            return new HandlerWorker(handler);
        }
    }
    
    // 3.当执行任务的时候
    private static final class HandlerWorker extends Worker {
            private final Handler handler;
        
            HandlerWorker(Handler handler) {
                this.handler = handler;
            }
    
            @Override
            public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
                ......
                // 包装任务
                run = RxJavaPlugins.onSchedule(run);
                ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
                
                // 创建Message包装任务
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; 
                
                // 发送任务到MainLooper中,该任务就在主线程中执行了
                handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    
                ......
                return scheduled;
            }
    }
    

    其实真正将任务放在主线程中执行就是上面三个步骤,但是Rxjava增加了很多其他功能,例如解除订阅(将任务包装在Disposable中),增加hook功能(在任务外面在包装了ScheduledRunnable)等等,其最内层的本质就是我们需要执行的任务。细化的包裹情况如下图:

    主线程执行任务.png

    4.多个线程切换,以哪个为准

    如下面代码,我们作死得切换线程,那么哪些线程会最终执行我们的任务呢

    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("JackOu");
        }
    })
      .subscribeOn(Schedulers.io())    // 上游切换,靠近上游的生效
      .subscribeOn(Schedulers.newThread())
      .subscribeOn(Schedulers.computation())
      
      .observeOn(Schedulers.io())
      .observeOn(Schedulers.computation())
      .observeOn(AndroidSchedulers.mainThread())  // 下游切换,靠近下游的生效
      .subscribe(new Observer<String>() {
          ......
          @Override
          public void onNext(String s) {
    
          }
          ......
      });
    

    我们可以从第二节和第三节看出,当我们每调用一次subscribeOn方法上游就会多包装一层Scheduler,在订阅之后,解包裹的时候越靠近“待执行任务”的subscribeOn越后解包,所以最靠近任务的subscribeOn调用会是最终被执行,也就是最终被执行的线程。

    因此我们可以总结得到:

    总结一: 在多次调用线程切换的时候,第一次调用subscribeOn的线程切换会是最后执行任务的线程;最后调用observeOn切换的线程会是最后执行的线程。

    总结二:从调用关系来看,越靠近上游的线程切换,将是最终执行任务的线程;越靠近下游的线程切换,将是最终执行任务的线程。

    相关文章

      网友评论

        本文标题:Rxjava原理(二)--线程调度

        本文链接:https://www.haomeiwen.com/subject/hibqzktx.html