Rxjava-订阅和线程切换简单分析

作者: C_Sev | 来源:发表于2020-12-22 23:57 被阅读0次

    喜欢的话,评论区留言点赞哦。

    Wechat: CoolOriLans 酷奇源语

    本篇文章简单分析下 Rxjava 订阅和线程切换简单:

    我们先来看一个订阅的流程,这个流程是在之前的基础上,加了些调味剂:

    val observable = Observable.create<Int>(object : ObservableOnSubscribe<Int> { // 返回的是 ObservableCreate 实例;
        override fun subscribe(emitter: ObservableEmitter<Int>) {
            emitter.onNext(1)
            emitter.onComplete()
        }
    })
    val observer = object : Observer<Int> {
        override fun onSubscribe(d: Disposable) {
            val d1 = d // 关键点 4: 这个 d 是 ObservableSubscribeOn.SubscribeOnObserver 对象。
            Log.d("rxjava", "Disposable: " +  d1.javaClass.canonicalName);
            // rxjava  : Disposable: io.reactivex.internal.operators.observable.ObservableSubscribeOn.SubscribeOnObserver
        }
        override fun onNext(t: Int) {}
        override fun onError(e: Throwable) {}
        override fun onComplete() {}
    }
    observable // 关键点 1: 是 ObservableCreate 实例;
    .observeOn(Schedulers.io()) // 关键点 2: 切换线程1,返回的是 ObservableObserveOn 实例;
    .subscribeOn(Schedulers.single()) // 关键点 3: 切换线程2,返回的是 ObservableSubscribeOn 实例;
    .subscribe(observer)
    

    本片文章也就是要弄懂上面的这个 4 个关键点:

    注意:

    1、我们是先调用 observeOn 再调用 subscribeOn,下面的分析也是按照这个顺序,实际上二者的顺序可以不一样。

    这里先简单的总结下(其实网上的很多博客都有讲到,在这里我也总结了下)

    subscribeOn 和observeOn 都是用来切换线程用的,但是区别如下:

    • subscribeOn:决定 Observable 对象在哪个线程上执行,其能改变调用它之前代码的线程;
    • observeOn:决定 Subscriber(Observer) 对象在哪个线程上执行,其能改变调用它之后代码的线程;

    那么具体是怎么实现的,我们往下慢慢的跟踪代码了;

    1 Schedulers - 简单分析

    Schedulers 是 Rxjava 的调度器,用于为流提供线程环境,这里先简单的看下,后面会分析:

    public final class Schedulers {
        @NonNull
        static final Scheduler SINGLE; // 单线程模式
    
        @NonNull
        static final Scheduler COMPUTATION;
    
        @NonNull
        static final Scheduler IO;
    
        @NonNull
        static final Scheduler TRAMPOLINE;
    
        @NonNull
        static final Scheduler NEW_THREAD;
    
        ... ... ... ...
    }
    

    Schedulers 默认提供了如上的线程实现。

    1.1 Schedulers.io

    这里先简单的看下 IO 的实现吧,这样方便我们对线程的切换进行分析:

        public static Scheduler io() {
            // RxJavaPlugins.onIoScheduler 是 hook 用的,这里不关注
            return RxJavaPlugins.onIoScheduler(IO);
        }
    

    实际上默认返回的就是上面的 Schedulers.IO:

    public final class Schedulers {
    
        static {
            SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
    
            COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    
            // 这里会创建 IO,RxJavaPlugins.initIoScheduler 依然是 Hook 专用的
            // 默认返回 IOTask
            IO = RxJavaPlugins.initIoScheduler(new IOTask());
    
            TRAMPOLINE = TrampolineScheduler.instance();
    
            NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
        }
    }
    

    我们去看看 IOTask:

    1.2 IOTask

        static final class IOTask implements Callable<Scheduler> {
            @Override
            public Scheduler call() throws Exception {
                return IoHolder.DEFAULT;
            }
        }
    

    1.3 IoHolder

        static final class IoHolder {
            // 最终实现为 IoScheduler;;
            static final Scheduler DEFAULT = new IoScheduler();
        }
    

    实际上:Schedulers 只是一个包装而已,真正的线程切换是基于平台提供的线程工具的:线程池;

    这个我们后面再分析哦。

    2 Observable

    2.1 observeOn

    observeOn 用于指定被观察者逻辑所在的 Scheduler:

    Observable.create 返回的是一个 ObservableCreate 实例:

        public final Observable<T> observeOn(Scheduler scheduler) {
            return observeOn(scheduler, false, bufferSize());
        }
    
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
            return observeOn(scheduler, delayError, bufferSize());
        }
    
        // 核心方法 3
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            // RxJavaPlugins.onAssembly 用于 Hook 操作,这里不关注
            //【-->3】默认返回 ObservableObserveOn 实例:
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    

    observeOn 有三个重载函数。

    最终调用的都是核心方法 3

    可以看到,默认其返回了一个 ObservableObserveOn 对象。将 ObservableCreate 包装到了内部,如下;

    ObservableObserveOn [ // --> 3
        ObservableCreate [ 
            ObservableOnSubscribe [  
                emitter: ObservableEmitter<Int>
            ]  
        ]  
    ]
    

    其实脉络已经很清楚了;

    2.2 subscribeOn

    observeOn 用于指定观察者逻辑所在的 Scheduler:

        public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            //【-->4.x】进入;
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    

    可以看到,默认其返回了一个 ObservableSubscribeOn 对象。将 ObservableCreate 包装到了内部!

    如下:

    ObservableSubscribeOn [ // --> 4
        ObservableObserveOn [ // --> 3
            ObservableCreate [ 
                ObservableOnSubscribe [  
                    emitter: ObservableEmitter<Int>
                ]  
            ]  
        ]
    ]
    

    其实脉络已经很清楚了;

    3 ObservableObserveOn

    3.1 constructor

    ObservableObserveOn 的父类 AbstractObservableWithUpstream 继承了 Observable,所以 ObservableObserveOn 具有 Observable 的能力:

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { // 如下:
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
        
        // 参数 Scheduler 是调度器;
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source); // 是 ObservableCreate 实例;
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        
    

    按照例子的顺序,这里的 ObservableSource<T> source 是 ObservableCreate 实例!

    abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
        protected final ObservableSource<T> source;
        ... ... ...
    }
    
    public interface HasUpstreamObservableSource<T> {
        ObservableSource<T> source();
    }
    

    3.2 subscribeActual

    我们回忆下 subscribe 的流程,subscribe 实际上最后会调用 subscribeActual

    但是此时执行完 observeOn 后,返回的是 ObservableObserveOn,其将 ObservableCreate 封装在了内部,所以会先执行 ObservableObserveOn.subscribeActual

        @Override
        protected void subscribeActual(Observer<? super T> observer) { //【-->4.3】是 SubscribeOnObserver 实例
            // 核心 1: 如果 scheduler 是 TrampolineScheduler 类型的,那么会直接执行订阅
            // 对观察者 observer 不做任何的包装;
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                // 核心 2: 如果是其他类型的,那么这里会通过 scheduler 创建对应的 Worker
                Scheduler.Worker w = scheduler.createWorker();
                // 核心 3: 然后将观察者 observer 封装成一个 【-->3.3】ObserveOnObserver,然后在执行订阅;
                // source 是【-->6.1】 ObservableCreate 实例;
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    这样,相当于发射器通过 ObserveOnObserver 委托代理执行 Observer 的方法了;

    source 是 ObservableCreate 实例,这里最终会调用 ObservableCreate.subcribeActual。

    只不过参数由 Observer 变成了 ObserveOnObserver(Observer)。

        ObservableObserveOn [ // --> 3
            ObservableCreate [ 
                ObservableOnSubscribe [  
                    emitter: ObservableEmitter<Int>
                ]  
            ]  
        ].subscribe -> subscribeActual {
            ObserveOnObserver [
                SubscribeOnObserver [ // 下面返回的;
                    
                ]
            ]
        }
    

    3.3 ObserveOnObserver - 切换核心

    ObserveOnObserver 是 ObservableObserveOn 的内部类:

    3.3.1 Constructor

       static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
       implements Observer<T>, Runnable {
    
            private static final long serialVersionUID = 6576896619930983584L;
            final Observer<? super T> downstream; //【1】下游的观察者;
            final Scheduler.Worker worker; //【2】线程调度;
            final boolean delayError;
            final int bufferSize;
    
            SimpleQueue<T> queue;
    
            Disposable upstream; // 表示上游的数据源,
    
            Throwable error;
            volatile boolean done;
    
            volatile boolean disposed;
    
            int sourceMode;
    
            boolean outputFused;
    
            ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
                this.downstream = actual; //【3】表示下游的数据接收方 Observer 实例,本例中是 SubscribeOnObserver 实例;
                this.worker = worker; //【4】线程调度;
                this.delayError = delayError;
                this.bufferSize = bufferSize;
            }
           ... ... ...
       }
    

    由于 ObserveOnObserver 本质也是一个 Observer,他代理了 Observer 一系列方法,我们去看看

    3.3.2 onSubscribe

    在订阅发生之前会先执行 onSubscribe 方法:

    参数 Disposable d 就是 CreateEmitter 发射器!

            @Override
            public void onSubscribe(Disposable d) {
                if (DisposableHelper.validate(this.upstream, d)) {
                    // 核心 1: CreateEmitter 作为发射器,用于发射数据;
                    this.upstream = d;
                    if (d instanceof QueueDisposable) {
                        @SuppressWarnings("unchecked")
                        QueueDisposable<T> qd = (QueueDisposable<T>) d;
    
                        int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
    
                        if (m == QueueDisposable.SYNC) {
                            sourceMode = m;
                            queue = qd;
                            done = true;
                            downstream.onSubscribe(this);
                            schedule();
                            return;
                        }
                        if (m == QueueDisposable.ASYNC) {
                            sourceMode = m;
                            queue = qd;
                            downstream.onSubscribe(this);
                            return;
                        }
                    }
    
                    queue = new SpscLinkedArrayQueue<T>(bufferSize);
                    // 核心 2: 调用下游的 Observer 的 onSubscribe 方法,将自身传递下去!
                    // downstream 是 SubscribeOnObserver
                    downstream.onSubscribe(this);
                }
            }
    

    3.3.3 onNext

            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
                if (sourceMode != QueueDisposable.ASYNC) {
                    // 把执行的结果保存到内部队列里;
                    queue.offer(t);
                }
                schedule();
            }
    

    3.3.4 onError

            @Override
            public void onError(Throwable t) {
                if (done) {
                    RxJavaPlugins.onError(t);
                    return;
                }
                // 保存错误到 error.
                error = t;
                done = true;
                schedule();
            }
    

    3.3.5 onComplete

            @Override
            public void onComplete() {
                if (done) {
                    return;
                }
                done = true;
                schedule();
            }
    

    4 ObservableSubscribeOn

    4.1 constructor

    同样的,ObservableSubscribeOn 也继承了 AbstractObservableWithUpstream 类,也具有 Observable 的能力:

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { // 如下:
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source); // 父类的变量;
            this.scheduler = scheduler;
        }
    

    注意:按照例子的顺序,这里的 ObservableSource<T> source 是 ObservableObserveOn 实例!(这里不一定是这样,因为调用顺序可以切换)

    abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    
        /** The source consumable Observable. */
        protected final ObservableSource<T> source;
        
        ... ... ...
    }
    

    4.2 subscribeActual - 切换核心

        @Override
        public void subscribeActual(final Observer<? super T> observer) {
            //【-->4.3】将 observer 包装成 SubscribeOnObserver 实例;;
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
            // 核心:这里立刻回调,这个回调会触发器上面的关键点 4:
            observer.onSubscribe(parent);
            //【-->】创建了 SubscribeTask 实例,将 SubscribeOnObserver 保存到内部;;
            // SubscribeTask 是一个 Runnable 实例;;
            //【-->4.3.5】scheduler.scheduleDirect 会返回一个 Disposable 对象。
            // SubscribeOnObserver 会持有该对象的原子引用;;
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    

    4.2.1 SubscribeTask - 切换核心

    可以看到 SubscribeTask 实际上就是一个 Runnable 实例;

    这里其实已经能够看到线程的切换的实现:将 Observer 封装到 SubscribeTask ( Runnable ) 中,通过 handler / ThreadPool 执行:

        final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent; //【-->4.3】SubscribeOnObserver 实例
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                // 这里的 source 来自 ObservableSubscribeOn
                // 就是 ObservableObserveOn 实例了;
                //【-->3.2】这样就进入了 ObservableObserveOn 的 subscribeActual 方法; 
                source.subscribe(parent);
            }
        }
    

    4.3 SubscribeOnObserver

    是对 Observer 的封装,继承了 AtomicReference<Disposable> 类,内部持有 Disposable 对象的原子引用:

        static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    
            private static final long serialVersionUID = 8094547886072529208L;
            final Observer<? super T> downstream; // 下游的观察者;
    
            final AtomicReference<Disposable> upstream;
    
            SubscribeOnObserver(Observer<? super T> downstream) {
                this.downstream = downstream; // 这个就是我们的 obsever
                this.upstream = new AtomicReference<Disposable>();
            }
        }
    

    可以的得到如下的关系图了:

    ObservableSubscribeOn [ // --> 4
        ObservableObserveOn [ // --> 3
            ObservableCreate [ 
                ObservableOnSubscribe [  
                    emitter: ObservableEmitter<Int>
                ]  
            ]  
        ]
    ].subscribe -> subscribeActual {
        SubscribeOnObserver [
            Observer [
                ...
            ]
        ]
    }
    

    4.3.1 onSubscribe

    保存上游的 Disposable 实例:

            @Override
            public void onSubscribe(Disposable d) {
                // 核心 1,这里的 d 是 ObserveOnObserver 实例【-->4.3.1】;
                DisposableHelper.setOnce(this.upstream, d);
            }
    

    4.3.2 onNext

    发送数据:

            @Override
            public void onNext(T t) {
                downstream.onNext(t);
            }
    

    4.3.3 onError

    发送异常:

            @Override
            public void onError(Throwable t) {
                downstream.onError(t);
            }
    

    4.3.4 onComplete

    完成回调:

            @Override
            public void onComplete() {
                downstream.onComplete();
            }
    

    4.3.5 setDisposable - 核心

    设置 Disposable 对象的弱引用,这个 Disposable 对象是由 Scheduler 对 new SubscribeTask(SubscribeOnObserver)) 封装后返回的:

            void setDisposable(Disposable d) {
                DisposableHelper.setOnce(this, d);
            }
    

    4.3.5.1 DisposableHelper.setOnce

    DisposableHelper 枚举类封装了对 Disposable 的操作:

        public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
            ObjectHelper.requireNonNull(d, "d is null");
            // 通过 CAS 设置内部的 field 变量为 d;
            if (!field.compareAndSet(null, d)) {
                // 如果失败的话,那么说明内部已经有 Disposable 对象了
                // 那这里会执行 d 的取下操作;
                d.dispose();
                if (field.get() != DISPOSED) {
                    reportDisposableSet();
                }
                return false;
            }
            return true;
        }
    

    不多说了。

    4.3.6 dispose

    取消订阅:

    @Override
    public void dispose() {
        // 清空对上游的弱引用;
        DisposableHelper.dispose(upstream);
        // 清空自身对下游的弱引用:SubscribeTask【-->4.2.1】
        DisposableHelper.dispose(this);
    }
    

    4.3.6 isDisposed

    是否取消订阅:

    @Override
    public boolean isDisposed() {
        return DisposableHelper.isDisposed(get());
    }
    

    5 subscribe 再分析

    接下来,我们来看看,在执行了 observeOn 和 subscribeOn 后的执行逻辑:

    回顾一下:

    observable // 关键点 1: 是 ObservableCreate 实例
    .observeOn(Schedulers.io()) // 关键点 2: 被观察者所在的线程,返回的是 ObservableObserveOn 实例;
    .subscribeOn(Schedulers.single()) // 关键点 3: 观察者所在的线程,返回的是 ObservableSubscribeOn 实例;
    .subscribe(observer)
    

    实际上经过了 observeOn/subscribeOn 方法的执行,整个逻辑变的很复杂了;

    5.1 subscribe 回顾

    subscribe 方法会调用 subscribeActual 方法:

        @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            try {
                // --> 按照现在的逻辑,进入 4.2 了;
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                ... ... ...
                throw npe;
            }
        }
    

    按照上面的包装顺序,会先调用 ObservableSubscribeOn 的 subscribeActual 方法了;

    6 ObservableCreate - 回顾

    6.1 subscribeActual - 回顾

    回顾下 subscribeActual 方法:

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            // 核心 1: 创建的 CreateEmitter 实例,包装下 Observer; 
            // 此时的 observer
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            // 核心 2: 这里的 observer 就是【-->3.3】ObserveOnObserver 实例;
            observer.onSubscribe(parent);
    
            try {
                // 核心 1: 这里的 source 是 ObservableOnSubscribe 实例;
                // 见【-->6.2】ObservableOnSubscribe
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    6.2 ObservableOnSubscribe

    回顾下 ObservableOnSubscribe

    public interface ObservableOnSubscribe<T> {
        void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
    }
    

    在这个方法中,我们使用:

    emitter.onNext(1)
    emitter.onComplete()
    

    6.2.1 onNext

    发射数据:

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            // 这里的 observer 实际上是【-->3.3】ObserveOnObserver 实例;
            observer.onNext(t);
        }
    }
    

    7 总结

    总结一下哈

    7.1 完整订阅的流程

    我们回顾下整个流程,下面的这个流程包含了线程切换的流程,以及调度的走势:

    (好累,后序补上哦)

    7.2 线程切换的关键

    被观察者的线程切换的 Mr.Key : ObservableObserveOn 和 ObserveOnObserver

    观察者的线程切换的 Mr.Key : ObservableSubscribeOn 和 SubscribeOnObserver

    7.2.1 ObservableObserveOn 的关键点

    • ObservableObserveOn 内部会讲 Scheduler 保存下来
        // 参数 Scheduler 是调度器;
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source); // 是 ObservableCreate 实例;
            this.scheduler = scheduler;
    
    • subscribeActual 方法,这里会实现订阅,用 ObserveOnObserver 封装 observer,同时传入 Scheduler.Worker 作为调度器:
        @Override
        protected void subscribeActual(Observer<? super T> observer) { //【-->4.3】是 SubscribeOnObserver 实例
            // 核心 1: 如果 scheduler 是 TrampolineScheduler 类型的,那么会直接执行订阅
            // 对观察者 observer 不做任何的包装;
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                // 核心 2: 如果是其他类型的,那么这里会通过 scheduler 创建对应的 Worker
                Scheduler.Worker w = scheduler.createWorker();
                // 核心 3: 然后将观察者 observer 封装成一个 【-->3.3】ObserveOnObserver,然后在执行订阅;
                // source 是【-->6.1】 ObservableCreate 实例;
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    
    • ObserveOnObserver 中的 onNext 通过 schedule() 通过线程池实现在指定线程中执行数据分发:
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                // 执行线程池调度;
                schedule();
            }
    
            void schedule() {
                if (getAndIncrement() == 0) {
                    worker.schedule(this); // 将自身加入到 worker 中;
                }
            }
    

    刚刚我们说了 ObserveOn 能改变调用后的逻辑所在的线程,这里其实已经能看到原因了。

    按照逻辑,发射器最终会调用 ObserveOnObserver 的 onNext/onComplete 方法,而这个方法会讲 ObserveOnObserver 自身(就是一个 Runnable)放入 worker 中执行调度,后续的操作就会在 worker 所在的线程了(这是一个上游 -> 下游的过程);

    到这里,就能够推断,ObserveOn 是可以调多次的,每次都会将下游的回调逻辑通过装饰器的方式切换到对应的线程;

    7.2.2 ObservableSubscribeOn 的关键点

    • ObservableSubscribeOn 内部会讲 Scheduler 保存下来
    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { // 如下:
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source); // 父类的变量;
            this.scheduler = scheduler;
        }
    
    • subscribeActual 方法,这里会实现订阅,用 ObserveOnObserver 封装 observer,同时传入 Scheduler.Worker 作为调度器:
        @Override
        public void subscribeActual(final Observer<? super T> observer) {
            //【-->4.3】将 observer 包装成 SubscribeOnObserver 实例;;
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
            // 这里立刻回调;
            observer.onSubscribe(parent);
            //【-->】创建了 SubscribeTask 实例,将 SubscribeOnObserver 保存到内部;;
            // SubscribeTask 是一个 Runnable 实例;;
            //【-->4.3.5】scheduler.scheduleDirect 会返回一个 Disposable 对象。
            // SubscribeOnObserver 会持有该对象的原子引用;;
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    
    • SubscribeTask 本质上就是一个 runnable:
        final class SubscribeTask implements Runnable {
            //【-->4.3】SubscribeOnObserver 实例
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                // 这里的 source 来自 ObservableSubscribeOn
                // 就是 ObservableObserveOn 实例了;
                //【-->3.2】这样就进入了 ObservableObserveOn 的 subscribeActual 方法; 
                source.subscribe(parent);
            }
        }
    

    刚刚我们说了 subscribeOn 能改变调用前的逻辑所在的线程,这里其实已经能看到原因了。

    按照逻辑,这里会将 source.subscribe(parent) 放到 scheduler 指定的线程中执行,而 subscribe 是一个从下游->上游的过程。

    source.subscribe(parent) 会按照订阅流程,向上不断的执行 source[x].subscribe(parent) -> source[x].subscribeActual(parent)

    一直到最终的 ObservableOnSubscribe ,而这个过程是在 scheduler 指定的线程中执行。

    同时,当到 ObservableOnSubscribe 的时候,会通过发射器发送数据,这样又会变成上游 -> 下游,而发生器也是在 ObservableOnSubscribe.subscribe 方法中触发,所以此时依然是在 scheduler 指定的线程中执行。

    最终,执行到最近的 ObserveOn 处,这里才会切换线程,这样下游的调度线程都会在 ObserveOn 方法进行切换了;

    疑问

    上面是先调用 observeOn 再调用 subscribeOn,实际上二者的顺序可以不一样。如果我们反过来呢?

    如果反过来的话只是装饰的对象顺序发生了变化,对于 subscribeOn 和 observeOn 的效果依然是没有变化的。

    这里不多说了,溜了溜了。

    相关文章

      网友评论

        本文标题:Rxjava-订阅和线程切换简单分析

        本文链接:https://www.haomeiwen.com/subject/figenktx.html