美文网首页Android架构师
RxJava的线程切换

RxJava的线程切换

作者: 程序员DS | 来源:发表于2022-06-20 17:35 被阅读0次

    RxJava 线程切换

    前言

    在上篇文章对RxJava 的工作流程进行的简单的分析,今天来分享一下线程切换的流程。如果觉得源码枯燥可以直接移至文末看图理解。

    实例代码

    Observable.create(new ObservableOnSubscribe<Object>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Object> emitter) {
                    emitter.onNext("123");
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Object>() {
            .........
            }
    
    

    我们都subscribeOn 是切换上游线程,observeOn是切换下游环境,接下来我们就看下它是怎么切换的。

    我们先看下 Schedulers.io() 是什么

    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
    ->
    Scheduler IO = RxJavaPlugins.initIoScheduler(new IOTask());  
    ->
    static final class IOTask implements Supplier<Scheduler> {
            @Override
            public Scheduler get() {
                return IoHolder.DEFAULT;
            }
        }  
    ->
    static final class IoHolder {
            static final Scheduler DEFAULT = new IoScheduler();
     } 
    
    

    我们通过层层分解,层层递进,了解到 Schedulers.io() 最终返回的是一个 IoScheduler()

    可以暂时将它理解为一个任务调度器,用来执行我们的任务。

    接下来我们在看下 AndroidSchedulers.mainThread() 。

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
    ->
     private static final Scheduler MAIN_THREAD =
            RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
    ->
     Scheduler DEFAULT  = new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
    
    

    可以看到这里返回的是一个 HandlerScheduler ,这里是对Handler进行了一个封装,所以归根结底,向主线程切换任务还是通过handler 来完成的,接下来我们就看看其中的细枝末节。

    两个参数分析完了,然后来看下两个操作符,看看它俩做了些什么事情。

    首先我们来看下subscribeOn

    public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
    }
    
    

    这里是对 IoScheduler() 和上游封装的包裹进行的二次封装

    上游的包裹:这里指的通过create创建的 ObservableCreate

    observeO操作符:

    public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    ->
     public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
            Objects.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
        }
    
    

    在这里是对 HandlerScheduler 和 上游封装的包裹进行的二次封装

    上游的包裹:这里指的是通过subscribeOn 操作符 创建的 ObservableSubscribeOn

    然后我们开始看订阅这里了,我们的流程从 subscribe 这里才刚刚开始。

    经过上一篇文章的分析,我们可以知道调用的 subscribeActual 方法都是在上游操作符创建的封装对象里,所以我们直接看 ObservableObserveOn 的 subscribeActual 方法。

    如果感觉这段讲解有些跳跃,可以先看一下上篇文章《浅析RxJava》

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
             .........
             Scheduler.Worker w = scheduler.createWorker();
             source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    
    

    在这里是将我们自定义的 Observer 封装成 ObserveOnObserver ,这里的source 是我们上游的封装的包裹,这里指的就是通过subscribeOn 操作符创建的 ObservableSubscribeOn。最终会调用到 ObservableSubscribeOn 的 subscribeActual

    方法。

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
        observer.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    

    这里是将下游传上来的 ObserveOnObserver 再次进行封装,封装成 SubscribeOnObserver ,然后再将 SubscribeOnObserver 封装成 SubscribeTask,其实就是一个Runnable。

    流程我们稍后再进行分析,我们先来看看任务是异步任务是怎么切换的。

    通过上文分析得知,此处的 scheduler 为 subscribeOn 操作符传入的参数,也就是 IoScheduler() 。

    接下来我们再看 scheduler的 scheduleDirect 方法。

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }
    
    

    在这里是通过 createWorker() 创建了一个 Worker ,由这个 Worker 去执行 具体的任务。

    createWork()是抽象方法,我们需要看IoScheduler 的具体实现。

    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    ->
     public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
     ....
      return threadWorker.scheduleActual(action, delayTime, unit, tasks);
     }
    ->
      NewThreadWorker类
      public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
               Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
               ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
               .........
               if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            return sr;
        }  
    
    

    经过层层挖掘,我们看到任务最后是通过 executor 来执行的,executor 就是内部维护的线程池

    private final ScheduledExecutorService executor;
    
    

    至此,整个工作流 就切换为了子线程来工作。

    接下来我们继续分析封装的SubscribeTask

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
    
    

    在任务执行的时候,会通过source 继续将 SubscribeOnObserver向上游传送。这里的source 指的是create 创建的ObservableCreate ,source.subscribe 就会直接调用到 ObservableCreate 的 subscribeActual

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    
    

    一直到这里,跟我们在上篇文章分析的流程就一样了。

    将下游传过来的SubscribeOnObserver 再次封装成 CreateEmitter 发射器,然后通过source 继续向上传递,这里的souce 就是指的是我们在create 中传递进去的ObservableOnSubscribe。

    然后在ObservableOnSubscribe 的 subscribe 中,通过 emitter.onNext 将我们的数据开始进行下发。

    ObservableEmitter
    @Override
    public void onNext(T t) {
        ........
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
    
    

    这里的observer 是SubscribeOnObserver

    SubscribeOnObserver
    @Override
    public void onNext(T t) {
        downstream.onNext(t);
    }
    
    

    这里的downstream 是指 ObservableObserveOn

    ObserveOnObserver
    @Override
    public void onNext(T t) {
       ...... 
       if (sourceMode != QueueDisposable.ASYNC) {
         queue.offer(t);
       }  
        schedule();
    }
    
    

    这里的 sourceMode 未被赋值,会调用 queue.offer(t) ,将数据放入到队列中。

    接下来再看 schedule() 做了些什么 ?

    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
    
    

    通过上文的分析,我们可以知道 observeOn 操作符创建的 Scheduler 为 HandlerScheduler ,所以这里的 worker.schedule(this) 方法调用的是 HandlerScheduler 的内部静态子类 HandlerWorker 的 schedule 方法。

    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        ...........
        if (disposed) {
            return Disposable.disposed();
        }
        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        message.obj = this;
        if (async) {
            message.setAsynchronous(true);
        }
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposable.disposed();
        }
        return scheduled;
    }
    
    

    最终是在这里通过Handler将任务切换到了主线程执行。

    ObserveOnObserver 类实现了Runnable 接口, worker.schedule(this) 是将自身交给Handler 去执行。所以最终的结果还会由 ObserveOnObserver 的run方法来执行。

    public void run() {
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }
    
    

    这里我们是典型的使用方式,我们直接来看下 drainNormal();

    void drainNormal() {
        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = downstream;
        for (;;) {
            ............
            for (;;) {
                T v = q.poll();
                a.onNext(v);
            }
            ..............
        }
    }
    
    

    在这里 将数据从队列中取出,然后调用下游的 onNext ,这里的 downstream 也就是我们最后自定义的观察者 Observer 了。

    整个过程也好比是一个封包裹和拆包裹的过程。用洋葱模型表示一下会更加的形象。

    最后上图!

    可能文字的叙述还是太抽象, 用这样一张图来表示整个流程可能相对好理解一些。

    写在最后

    纸上得来终觉浅,绝知此事要躬行。如果有时间还是建议自己跟一遍源码流程,这样才能真正理解。

    顺便给大家分享一份《Android百大框架源码解析》,需要的小伙伴可以在公众号免费获取~

    《Android百大框架源码解析》

    相关文章

      网友评论

        本文标题:RxJava的线程切换

        本文链接:https://www.haomeiwen.com/subject/fjpjvrtx.html