美文网首页Android源码分析
RxJava2源码分析-线程调度-subscribeOn和obs

RxJava2源码分析-线程调度-subscribeOn和obs

作者: o动感超人o | 来源:发表于2018-09-20 13:57 被阅读8次

    先上这两个对应的类的代码(仅看关键的方法):
    Observable.subscribeOn方法创建的类ObservableSubscribeOn

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
        final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }
    }
    

    Observable.observeOn方法创建的类ObservableObserveOn

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    }
    

    Observable.create创建的普通的ObservableCreate

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    
        static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
        implements Observer<T>, Runnable {
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                schedule();
            }
            void schedule() {
                if (getAndIncrement() == 0) {
                    worker.schedule(this);
                }
            }
            @Override
            public void run() {
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
        }
    }
    

    可以看到,普通的没有线程切换的Observable,在subscribeActual方法里直接执行了source.subscribe(parent)方法。

    ObservableSubscribeOnsubscribeActual方法里,可以看到source. subscribe发生在指定的线程里,所以上游在指定线程执行subscribe方法

    ObservableObserveOnsubscribeActual方法里,可以看到source. subscribe仍然在当前线程,在run方法里有判断如果是异步的就把下游observer.onNextobserver.onError等方法在指定线程里执行。

    由上面可以看出subscribeOn影响的是上游,observeOn影响的是下游。

    还有一种情况,如果有多个subscribeOn或者多个observeOn的时候,每句代码生成的Observable执行在哪个线程呢?这里先说一下结论,每个Observable在下一句离它最近的subscribeOn指定的线程执行,每个Observable在上一句离它最近的observeOn指定的线程执行。
    为什么会是这个结论呢?
    因为订阅(Observable.subscribe)发生的顺序是从下游到上游,所以执行subscribeOn的时候

    --未写完--

    相关文章

      网友评论

        本文标题:RxJava2源码分析-线程调度-subscribeOn和obs

        本文链接:https://www.haomeiwen.com/subject/mkwgnftx.html