美文网首页Android知识
RxJava2 线程切换原理

RxJava2 线程切换原理

作者: 董江鹏 | 来源:发表于2018-06-04 23:04 被阅读13次

    在移动端编程的时候基本上都会需要频繁地切换线程,因为复杂的耗时任务需要放到后台线程里去运行,UI绘制工作又只能在主线程里执行。Android 里一般用Handler实现,iOS 里通过系统提供的任务队列实现,但两者均不够优雅,无可避免地需要将代码包裹起来,Android 里需要用Runnable传递,iOS里则是闭包。如果遇到多次切换线程的情况,代码的缩进层级就会变深,可读性变差。

    还好,ReactiveX 库优雅地解决了这个问题,而且还是跨平台的,RxJavaRxSwift 让移动端的线程切换工作变得优雅起来。

    这篇文章主要试图说清楚RxJava2线程切换的实现过程。

    操作符

    RxJava2 的线程操作符是 subscribeOnobserveOn ,我们先按调用顺序一步步看。以下源码均有简化。

    • subscribeOn
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    

    subscribeOn 操作符将之前产生的 Observable 和 传入的 Scheduler封装成 ObservableSubscribeOn

    • observeOn
       public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
           return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
       }
    

    observeOn 操作符将之前产生的 Observable 和 传入的 Scheduler封装成 ObservableObserveOn

    • subscribe
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                Action onComplete, Consumer<? super Disposable> onSubscribe) {
            LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
            subscribe(ls);
            return ls;
        }
    
        public final void subscribe(Observer<? super T> observer) {  
            subscribeActual(observer);    
        }
    
    最后通过`subscribe`操作符设置观察者(`Observer`)来触发整个流程。
    

    流程

    subscribe 会调用上一个Observable(即ObservableObserveOn)的 subscribeActual() 方法。

        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    此处将observeOn操作符后面的观察者(包括后面所有的操作符和Observer)和其将要运行的线程相关内容(Worker)封装成ObserveOnObserver 。此处的sourceobserveOn操作符之前的内容(切换之前产生的Observable)。

    由此可见,每一次observeOn的使用,都会将后面的观察者(同上)和将要切换的线程内容封装起来。

    我们可以看看ObserveOnObserver的执行过程。

          public void onSubscribe(Disposable s) {
                if (DisposableHelper.validate(this.s, s)) {
                    this.s = s;
                    if (s instanceof QueueDisposable) {
                        @SuppressWarnings("unchecked")
                        QueueDisposable<T> qd = (QueueDisposable<T>) s;
    
                        int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
    
                        if (m == QueueDisposable.SYNC) {
                            sourceMode = m;
                            queue = qd;
                            done = true;
                            actual.onSubscribe(this);
                            schedule();
                            return;
                        }
                        if (m == QueueDisposable.ASYNC) {
                            sourceMode = m;
                            queue = qd;
                            actual.onSubscribe(this);
                            return;
                        }
                    }
    
                    queue = new SpscLinkedArrayQueue<T>(bufferSize);
    
                    actual.onSubscribe(this);
                }
            }
    
            void schedule() {
                if (getAndIncrement() == 0) {
                    worker.schedule(this);
                }
            }
    
            public void run() {
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
    
            ......
    

    ObserveOnObserver里的数据处理都将在指定的Scheduler里执行,所以到这里可以证明,每次observeOn都会切换后面内容的执行线程。

    再继续往上调用,回到前面的内容

    source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    

    假设该source是由subscribeOn产生的, 此处将会调用ObservableSubscribeOn里的subscribeActual() 方法。

        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
            s.onSubscribe(parent);
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    
        final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }
    

    上一个操作产生的source(即Observable)的subscribe操作将在指定的Scheduler里执行, 每一次subscribeOn的调用都会在之前的Observable上封装一层,但数据的发射由最里层的Observable实现,即在第一个ObservableSubscribeOn封装里执行。

    我们以最简单的 ObservableJust 举例,subscribeActual() 必然在第一个subscribeOn里的Scheduler里执行。

    public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    
        private final T value;
        public ObservableJust(final T value) {
            this.value = value;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> s) {
            ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
            s.onSubscribe(sd);
            sd.run();
        }
    
        @Override
        public T call() {
            return value;
        }
    }
    
    ......
    
          public void run() {
                if (get() == START && compareAndSet(START, ON_NEXT)) {
                    observer.onNext(value);
                    if (get() == ON_NEXT) {
                        lazySet(ON_COMPLETE);
                        observer.onComplete();
                    }
                }
            }
    

    小结

    文章从源码层面解释了RxJava2线程切换的原理,以及 subscribeOnobserveOn 两个操作符生效的场景。
    整个流程还是很复杂,我先是读了几遍源码,但是在记忆里找起来总是很混乱,便写了个简单的demo,用断点调试工具一步步地走,才慢慢豁然开朗。如果看不明白,建议使用断点调试工具。

    相关文章

      网友评论

        本文标题:RxJava2 线程切换原理

        本文链接:https://www.haomeiwen.com/subject/zqeusftx.html