美文网首页
Rxjava2.1 线程切换原理解析

Rxjava2.1 线程切换原理解析

作者: innovatorCL | 来源:发表于2019-08-28 17:30 被阅读0次

    一、前提说明

    本文是在 Rxjava 2.1 的基础上进行的,目前只对 Rxjava 进行解析,未搭配 Retrofit 食用,如果想看 Rxjava + Retrofit 源码解析,请移步 Retrofit 2.1 + Rxjava 源码解析(一)

    二、Rxjava 使用栗子

    new Thread("子线程"){
              @Override
              public void run() {
                  Observable.create(new ObservableOnSubscribe<String>() {
                              @Override
                              public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                  Log.e(TAG, "Observable#subscribe(): 所在线程为 " + Thread.currentThread().getName());
                                  emitter.onNext("1");
                                  emitter.onComplete();
                              }
                          })
    //              .subscribeOn(Schedulers.io())
                          .observeOn(Schedulers.io())
                          .subscribe(new Observer<String>() {
                              @Override
                              public void onSubscribe(Disposable d) {
                                  Log.e(TAG, "observer#onSubscribe(): 所在线程为 " + Thread.currentThread().getName());
                              }
    
                              @Override
                              public void onNext(String s) {
                                  Log.e(TAG, "observer#onNext(): 所在线程为 " + Thread.currentThread().getName());
                              }
    
                              @Override
                              public void onError(Throwable e) {
                              }
    
                              @Override
                              public void onComplete() {
                                  Log.e(TAG, "observer#onComplete(): 所在线程为 " + Thread.currentThread().getName());
                              }
                          });
              }
          }.start();
    

    输出结果:

    E/Rxjava: observer#onSubscribe(): 所在线程为 子线程
    E/Rxjava: Observable#subscribe(): 所在线程为 子线程
    E/Rxjava: observer#onNext(): 所在线程为 RxCachedThreadScheduler-1
    E/Rxjava: observer#onComplete(): 所在线程为 RxCachedThreadScheduler-1
    

    Rxjava2.1订阅流程解析 中我们已经分析了 Observable.create() 的过程,就是构建一个 ObservableCreate 对象,ObservableCreate 是 Observable 的子类。

    由上文可以知道,当调用了 subscribe() 后,会执行以下顺序:Observable.subscribe(Observer) -> ObservableCreate.subscribeActual(Observer) -> Observer#onSubscribe(),所以可以知道 Observer#onSubscribe() 的执行线程是当前线程,即调用了 subscribe()的线程。

    三、Observable.observeOn(Schedulers.io())

    从上面栗子可以看到,如果我们只是调用了 observeOn(Schedulers.io()),这样影响的是 observer 的 onNext() 和 onComplete(),对于 ObservableOnSubscribe#subscribe() 和 Observer#onSubscribe() 是没有影响的。

    我们看看 Observable.observeOn(Schedulers.io()) 的源码:

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            //删除无关紧要的代码
            //这里的 this 是 ObservableCreate 对象
            return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
        }
    

    可以看到就是将传入的 ObservableCreate 对象封装进了 ObservableObserveOn 对象中,可以肯定的是 ObservableObserveOn 也是 Observable 的子类。

    我们从上文得知,接下来会调用 observable.subscribe(observer) 的时候会跳转调用 Observable 子类的 ObservableObserveOn.subscribeActual(observer) 方法。这其实是用了静态工厂模式。

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
        
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
             // 如果传入的 scheduler 是 Scheduler.trampoline() 的情况
            // 该线程的意义是传入当前线程,也就是不做任何线程切换操作
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
                //这里的 source 是 ObservableCreate 对象
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    }
    

    当需要切换线程的时候,可以看到将传进来的 ObservableCreate 对象进行了订阅,只不过观察者又被封装成了 ObserveOnObserver 对象。这样就会执行 ObservableCreate#subscribeActual()

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            //这里的 observer 就是 ObserveOnObserver 对象
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            //这里的 observer 就是 ObserveOnObserver 对象
            observer.onSubscribe(parent);
    
            try {
                //这里的额 source 就是我们在最外层创建的 ObservableOnSubscribe 对象
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    }
    

    这里可以看到使用了 observeOn(Schedulers.io()) 方法,但是 Observer#onSubscribe(Disposable d) 并没有切换线程,仍在当前线程中运行。也就是 ObserveOnObserver.onSubscribe() 是运行在当前线程的。我们看看这个方法做了什么:

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
    
            private static final long serialVersionUID = 6576896619930983584L;
            //真正的观察者,最外层我们创建的 observer
            final Observer<? super T> actual;
            final Scheduler.Worker worker;
    
            Disposable s;
    
            ......
            
            ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
                this.actual = actual;
                this.worker = worker;
            }
    
            @Override
            public void onSubscribe(Disposable s) {
                if (DisposableHelper.validate(this.s, s)) {
                    this.s = s;
                    if (s instanceof QueueDisposable) {
                        @SuppressWarnings("unchecked")
                        QueueDisposable<T> qd = (QueueDisposable<T>) s;
    
                        int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
    
                        if (m == QueueDisposable.SYNC) {
                            sourceMode = m;
                            queue = qd;
                            done = true;
                            //执行真正的被观察者 Observer(最外层我们创建的 observer)#onSubscribe()
                            actual.onSubscribe(this);
                            schedule();
                            return;
                        }
                        if (m == QueueDisposable.ASYNC) {
                            sourceMode = m;
                            queue = qd;
                            //执行真正的被观察者 Observer(最外层我们创建的 observer)#onSubscribe()
                            actual.onSubscribe(this);
                            return;
                        }
                    }
    
                    queue = new SpscLinkedArrayQueue<T>(bufferSize);
                    //执行真正的被观察者 Observer(最外层我们创建的 observer)#onSubscribe()
                    actual.onSubscribe(this);
                }
            }
    
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                schedule();
            }
    
            @Override
            public void onError(Throwable t) {
                if (done) {
                    RxJavaPlugins.onError(t);
                    return;
                }
                error = t;
                done = true;
                schedule();
            }
    
            @Override
            public void onComplete() {
                if (done) {
                    return;
                }
                done = true;
                schedule();
            }
    
            @Override
            public void dispose() {
                if (!cancelled) {
                    cancelled = true;
                    s.dispose();
                    worker.dispose();
                    if (getAndIncrement() == 0) {
                        queue.clear();
                    }
                }
            }
        
        void schedule() {
                if (getAndIncrement() == 0) {
                    //this 就是 ObserveOnObserver 对象
                    worker.schedule(this);
                }
            }
    }
    

    可以看到 ObserveOnObserver#onSubscribe(Disposable s) 中一定会调用 actual.onSubscribe(this);,其中这个 this 就是 ObserveOnObserver 对象,也就是让我们最外层的 observer 订阅了 ObserveOnObserver。

    可以看到在 RxJava 中运用的操作符都会在内部创建一个 Observable 和 Observer,所以外界使用起来和简单,但是里面运行的原理倒是挺复杂的,容易让人混淆。

    运行完 ObserveOnObserver#onSubscribe(Disposable s) 后,就轮到了 source.subscribe(parent);(这里的额 source 就是我们在最外层创建的 ObservableOnSubscribe 对象),也就是说我们的 ObservableOnSubscribe#subscribe(emitter) 运行在当前线程。到这里的分析都很符合我们打印的结果。

    而我们在最外层,只是让发射器 emitter 简单地发送了一个 Next 事件。这个事件会被谁接收呢?

    static final class CreateEmitter<T> extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
        
            private static final long serialVersionUID = -3434801548987643227L;
    
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                //这里的 observer 就是 ObserveOnObserver 对象
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                //这里的 observer 就是 ObserveOnObserver 对象
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
            
            ......
    }
    

    我们从之前调用到 ObservableCreate#subscribeActual() 可以知道,当时传进来的 parent 是 ObserveOnObserver 对象。所以发射器 emitter 发射的事件会被 ObserveOnObserver 接收。

    可以看到 ObserveOnObserver.onNext() 中最后执行了 schedule(),也就是在这里进行了线程切换的操作。

    由于我们传入的 Scheduler 是 IO 线程,我们看看这个 IO Schedule 的 worker.schedule(this)

    一路追踪,终于找到了这个 IOScheduler 的庐山真面目:

    public final class IoScheduler extends Scheduler {
    
    private static final String WORKER_THREAD_NAME_PREFIX = "RxCachedThreadScheduler";
        static final RxThreadFactory WORKER_THREAD_FACTORY;
    
        private static final String EVICTOR_THREAD_NAME_PREFIX = "RxCachedWorkerPoolEvictor";
        static final RxThreadFactory EVICTOR_THREAD_FACTORY;
    
        private static final long KEEP_ALIVE_TIME = 60;
        private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;
    
        static final ThreadWorker SHUTDOWN_THREAD_WORKER;
        final ThreadFactory threadFactory;
        final AtomicReference<CachedWorkerPool> pool;
        
        public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
        
        ......
        
        static final class EventLoopWorker extends Scheduler.Worker {
            private final CompositeDisposable tasks;
            private final CachedWorkerPool pool;
            private final ThreadWorker threadWorker;
    
            final AtomicBoolean once = new AtomicBoolean();
    
            EventLoopWorker(CachedWorkerPool pool) {
                this.pool = pool;
                this.tasks = new CompositeDisposable();
                this.threadWorker = pool.get();
            }
    
            @Override
            public void dispose() {
                if (once.compareAndSet(false, true)) {
                    tasks.dispose();
    
                    // releasing the pool should be the last action
                    pool.release(threadWorker);
                }
            }
    
            @Override
            public boolean isDisposed() {
                return once.get();
            }
    
            @NonNull
            @Override
            public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
                //放到线程池中执行
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
        }
    
        ......
    }
    

    至此,你可以看到调用了 observerOn() 方法的全过程,只是会改变观察者 observer 的 onNext()、onComplete() 方法的运行线程,不会改变被观察者 Observable 的运行线程。

    四、observeOn() 切换线程原理小结

    看完整个过程,我们知道当我们使用 observeOn(Schedulers.io())的时候,其实 Rxjava 在内部帮我们创建封装了若干个中间对象的 Observable 和 Observer。然后将这个订阅操作放在 Rxjava 的线程池进行,达到切换线程的功能。

    被观察者 Observable 的变化过程:Observable ==> ObservableCreate ==> ObserbvableObserveOn。

    观察者 Observer 的变化过程:Observer ==> ObserveOnObserver,然后传到 ObservableEmitter<String> emitter 里面,作为发射器的 observer 成员变量。

    总之,Observable#observeOn(Scheduler) 的实现原理在于将目标 Observer 的 onNext(T)/onError(Throwable)/onComplete() 置于指定线程中运行。

    五、subscribeOn() 栗子

    new Thread("子线程"){
              @Override
              public void run() {
                  Observable
                          .create(new ObservableOnSubscribe<String>() {
                              @Override
                              public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                  Log.e(TAG, "ObservableOnSubscribe#subscribe(): 所在线程为 " + Thread.currentThread().getName());
                                  emitter.onNext("1");
                                  emitter.onComplete();
                              }
                          })
                          .subscribeOn(Schedulers.io())
    //                      .observeOn(Schedulers.io())
                          .subscribe(new Observer<String>() {
                              @Override
                              public void onSubscribe(Disposable d) {
                                  Log.e(TAG, "observer#onSubscribe(): 所在线程为 " + Thread.currentThread().getName());
                              }
    
                              @Override
                              public void onNext(String s) {
                                  Log.e(TAG, "observer#onNext(): 所在线程为 " + Thread.currentThread().getName());
                              }
    
                              @Override
                              public void onError(Throwable e) {
                              }
    
                              @Override
                              public void onComplete() {
                                  Log.e(TAG, "observer#onComplete(): 所在线程为 " + Thread.currentThread().getName());
                              }
                          });
              }
          }.start();
    

    输出结果:

    E/Rxjava: observer#onSubscribe(): 所在线程为 子线程
    E/Rxjava: ObservableOnSubscribe#subscribe(): 所在线程为 RxCachedThreadScheduler-2
    E/Rxjava: observer#onNext(): 所在线程为 RxCachedThreadScheduler-2
    E/Rxjava: observer#onComplete(): 所在线程为 RxCachedThreadScheduler-2
    

    六、ObservableCreate.subscribeOn()

    由上文可以,Observable.create() 会生成一个 ObservableCreate 对象。我们看看 ObservableCreate.subscribeOn()

    public final Observable<T> subscribeOn(Scheduler scheduler) {
            //过滤无关紧要的代码
            //this 是 ObservableCreate 对象
            return new ObservableSubscribeOn<T>(this, scheduler);
        }
    

    可以看到将 ObservableCreate 对象封装成了 ObservableSubscribeOn 对象,然后就会执行 ObservableSubscribeOn#subscribeActual()

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
        //这里的 source 是 ObservableCreate 对象,scheduler 是 IoScheduler
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            //s 是最外层的 observer
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
            
            //调用 observer#onSubscribe
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
        
        ......
    }
    

    可以看到 observer#onSubscribe() 仍在在当前线程中执行,之后的 observer 和 ObservableOnSubscribe 的方法都被线程切换类 IoScheduler 切换到了其他线程。

    我们看看 IoScheduler 的 scheduler.scheduleDirect(new SubscribeTask(parent)))

    final class SubscribeTask implements Runnable {
            //这个 parent 就是 SubscribeOnObserver
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                //source 就是 ObservableCreate 对象
                //parent 就是 SubscribeOnObserver 对象
                source.subscribe(parent);
            }
        }
        
        static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    
            private static final long serialVersionUID = 8094547886072529208L;
            final Observer<? super T> actual;
    
            final AtomicReference<Disposable> s;
    
            SubscribeOnObserver(Observer<? super T> actual) {
                //actual 就是最外层的 observer 
                this.actual = actual;
                this.s = new AtomicReference<Disposable>();
            }
    
            @Override
            public void onSubscribe(Disposable s) {
                DisposableHelper.setOnce(this.s, s);
            }
    
            @Override
            public void onNext(T t) {
                actual.onNext(t);
            }
    
            @Override
            public void onError(Throwable t) {
                actual.onError(t);
            }
    
            @Override
            public void onComplete() {
                actual.onComplete();
            }
    
            @Override
            public void dispose() {
                DisposableHelper.dispose(s);
                DisposableHelper.dispose(this);
            }
            
            @Override
            public boolean isDisposed() {
                return DisposableHelper.isDisposed(get());
            }
    
            void setDisposable(Disposable d) {
                DisposableHelper.setOnce(this, d);
            }
        }
    

    可以看到将最外层的 observer 包装成 SubscribeOnObserver 对象,然后包装成一个 SubscribeTask(可以执行的任务)。当在线程池中被执行的时候,会执行 SubscribeTask#run()

    我们再看 IoSchedule#scheduleDirect(subscribeTask)

    在 IoSchedule 的父类 Schedule 中找到一个方法:

    public abstract class Scheduler {
        public Disposable scheduleDirect(@NonNull Runnable run) {
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
        
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            DisposeTask task = new DisposeTask(decoratedRun, w);
            //其实就是 EventLoopWorker#schedule()
            w.schedule(task, delay, unit);
    
            return task;
        }
    }
    

    又回到了 IOScheduler 创建的 EventLoopWorker 中:

    static final class EventLoopWorker extends Scheduler.Worker {
            private final CompositeDisposable tasks;
            private final CachedWorkerPool pool;
            private final ThreadWorker threadWorker;
    
            final AtomicBoolean once = new AtomicBoolean();
    
            EventLoopWorker(CachedWorkerPool pool) {
                this.pool = pool;
                this.tasks = new CompositeDisposable();
                this.threadWorker = pool.get();
            }
    
            @Override
            public void dispose() {
                if (once.compareAndSet(false, true)) {
                    tasks.dispose();
    
                    // releasing the pool should be the last action
                    pool.release(threadWorker);
                }
            }
    
            @Override
            public boolean isDisposed() {
                return once.get();
            }
    
            @NonNull
            @Override
            public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
                
                //最终放进线程池中执行任务
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
        }
    
    static final class ThreadWorker extends NewThreadWorker {
            private long expirationTime;
    
            ThreadWorker(ThreadFactory threadFactory) {
                super(threadFactory);
                this.expirationTime = 0L;
            }
    
            public long getExpirationTime() {
                return expirationTime;
            }
    
            public void setExpirationTime(long expirationTime) {
                this.expirationTime = expirationTime;
            }
        }
    
    public class NewThreadWorker extends Scheduler.Worker implements Disposable {
        private final ScheduledExecutorService executor;
    
        volatile boolean disposed;
    
        public NewThreadWorker(ThreadFactory threadFactory) {
            executor = SchedulerPoolFactory.create(threadFactory);
        }
        
        ......
        
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    //放进线程池中执行
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    //放进线程池中执行
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
        
        ......
    }
    

    最终在线程池中执行了我们的 SubscribeTask#run()。其实从 scheduleActual() 和 subscribeActual() 的命名方式可以看出,Rxjava 很多地方都用到了静态工厂模式,都是父类提供抽象方法,具体的子类根据需要实现不同的逻辑,这个就很灵活了。

    我们再看看 SubscribeTask#run() 干了什么:

    final class SubscribeTask implements Runnable {
            //这个 parent 就是 SubscribeOnObserver
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                //source 就是 ObservableCreate 对象
                //parent 就是 SubscribeOnObserver 对象
                source.subscribe(parent);
            }
        }
    

    **那其实就是 ObservableCreate.subscribe(SubscribeOnObserver),这就又跳到了我们熟悉的 ObservableCreate.subscribeActual() 方法中了。

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        //source 是最外层的 ObservableOnSubscribe 对象
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            //observer 是 SubscribeOnObserver 对象,里面包含最外层的 observer 对象
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            //这里 SubscribeOnObserver 只是简单地 set 了一个引用
            observer.onSubscribe(parent);
    
            try {
                //其实就是 ObservableOnSubscribe.subscribe(SubscribeOnObserver);
                //此时已经运行在 Rxjava 的线程池中
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
        
        ......
    }
    

    在这里会先执行 observer (SubscribeOnObserver)的 onSubscribe() 方法,这个方法就 set 了一个引用,可以先忽略。接下来会调用 ObservableOnSubscribe.subscribe(SubscribeOnObserver)

    Observable.create(new ObservableOnSubscribe<String>() {
                              @Override
                              public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                  Log.e(TAG, "ObservableOnSubscribe#subscribe(): 所在线程为 " + Thread.currentThread().getName());
                                  emitter.onNext("1");
                                  emitter.onComplete();
                              }
                          });
    

    我们在最外层只发送了一个 Next 事件,根据 CreateEmitter<T> 类的源码:

    public void onNext(T t) {
               
                if (!isDisposed()) {
                    //observer 就是 SubscribeOnObserver 
                    observer.onNext(t);
                }
            }
    

    SubscribeOnObserver.onNext() 会触发:

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    
            private static final long serialVersionUID = 8094547886072529208L;
            final Observer<? super T> actual;
    
            final AtomicReference<Disposable> s;
    
            SubscribeOnObserver(Observer<? super T> actual) {
                //最外层的 observer 
                this.actual = actual;
                this.s = new AtomicReference<Disposable>();
            }
    
            @Override
            public void onSubscribe(Disposable s) {
                DisposableHelper.setOnce(this.s, s);
            }
    
            @Override
            public void onNext(T t) {
                //调用最外层的 observer#onNext()
                actual.onNext(t);
            }
    
            @Override
            public void onError(Throwable t) {
                actual.onError(t);
            }
    
            @Override
            public void onComplete() {
                //调用最外层的 observer#onComplete()
                actual.onComplete();
            }
    
            @Override
            public void dispose() {
                DisposableHelper.dispose(s);
                DisposableHelper.dispose(this);
            }
    
            @Override
            public boolean isDisposed() {
                return DisposableHelper.isDisposed(get());
            }
    
            void setDisposable(Disposable d) {
                DisposableHelper.setOnce(this, d);
            }
        }
    

    所以,我们最外层的 observer 的 onNext() 和 onComplete() 会运行在 Rxjava 的线程池的线程中。

    至此,subscribeOn(Schedulers.io()) 的过程分析完毕,subscribeOn(Schedulers.io()) 会改变观察者 observer 的 onNext()、onComplete() 方法的运行线程,也会改变被观察者 Observable 的运行线程。

    七、subscribeOn() 切换线程原理小结

    看完整个过程,我们知道当我们使用 subscribeOn(Schedulers.io())的时候,其实跟上面的 observeOn(Schedulers.io()) 过程差不多,Rxjava 帮我们创建了若干个中间层的 Observable 和 Observer,然后将这个订阅操作放在 Rxjava 的线程池进行,达到切换线程的功能。

    被观察者 Observable 的变化过程:Observable ==> ObservableCreate ==> ObserbvableSubscribeOn。

    观察者 Observer 的变化过程:Observer ==> SubscribeOnObserver,然后传到 ObservableEmitter<String> emitter 里面,作为发射器的 observer 成员变量。

    总之,Observable#subscribeOn(Scheduler) 的实现原理在于将目标 Observer 的 onNext(T)/onError(Throwable)/onComplete() 和 ObservableOnSubscribe 的 subscribe(T) 置于指定线程中运行。

    八、subscribeOn() 和 observeOn(Schedulers.io()) 一起使用

    这两个 api 一起使用其实也不会有什么很大的变化,就是 observeOn() 会影响 Observer 的 onNext(T)/onError(Throwable)/onComplete() 运行线程,而 subscribeOn() 会影响 ObservableOnSubscribe 的 subscribe(T) 运行线程。

    相关文章

      网友评论

          本文标题:Rxjava2.1 线程切换原理解析

          本文链接:https://www.haomeiwen.com/subject/hyumectx.html