美文网首页Android开发Android开发经验谈Android技术知识
Android RxJava2.x(二)线程调度和操作符源码分析

Android RxJava2.x(二)线程调度和操作符源码分析

作者: 程序员三千_ | 来源:发表于2020-04-19 14:06 被阅读0次

Android RxJava2.x(一)整体框架分析

使用过RxJava的人都知道,方便的Api链式调用外,还有一个非常好用的地方,就是线程调度和操作符的使用,我们来举个例子

    Observable.just("test")   
                //指定了被观察者执行的线程环境为io线程
                .subscribeOn(Schedulers.io())
                //使用map操作来完成类型转换
                .map(new Function<String,String>(){
                    @Override
                    public String apply(String s) throws Exception {
                        //这里执行一个耗时操作,比如IO操作
                        return s + "map";
                    }
                })
                //将后面执行的线程环境切换为主线程
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                        new Consumer<String>() {
                            @Override
                            public void accept(String s) throws Exception {
                                Log.i("xiaosanye","s: " + s);
                            }
           });
  • 实际上在使用map操作时,new Function() 就对应了类型的转变的方向。在apply()方法中,输入的是字符串s,返回的是字符串s+“map”,当然我们也可以进行类型转化,比如传入原类型String返回转换后的类型Bitmap。
  • 如果在map操作时,执行的是耗时操作的化,比如网络请求、IO操作,这时候我们就要把这个操作切换到子线程去执行,执行完成后,再切换回UI主线程,如果没有使用RxJava肯定得使用各种callback函数,而在RxJava中,通过observeOn和subscribeOn链式操作很方便的实现了线程切换。

1、源码分析observable.subscribeOn和observable.observeOn的内部实现

Observable#observable.subscribeOn
=>return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
=> ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T>
=>AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T>

     @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

observable.subscribeOn通过return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler))方法,将this也就是observable本身传进去,最终返回一个ObservableSubscribeOn对象,ObservableSubscribeOn继承自AbstractObservableWithUpstream
AbstractObservableWithUpstream继承自Observable并且实现HasUpstreamObservableSource接口,
所以ObservableSubscribeOn对象本身是一个被观察者,
我们我们可以得出结论,调用observable.subscribeOn最后实际上是返回了一个新的被观察者ObservableSubscribeOn

我们再看observable.observeOn

 @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

Observable#observable.observeOn
=>return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize))
=>ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T>
=>AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T>

同理调用observable.observeOn通过return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize))方法,也将this也就是observable本身传进去,最终返回的也是一个新的被观察者ObservableObserveOn

由上一篇文章我们知道,我们在Observable.create的时候返回一个被观察者ObservableCreate对象,所以我们创建observable的时候实际上返回的是ObservableCreate对象。

  • 综上代码分析,其实不管是Observable#observable.subscribeOn还是Observable#observable.observeOn最终都是将observable传进去(也就是ObservableCreate对象),经过装饰,返回了一个新的被观察者对象而已,如下图


    image.png
image.png

所以最后,我们Observable.subscribe订阅的时候实际上调用的就是经过装饰的最外层ObservableObserveOn

细心的小伙伴肯定也看到了,其实在它们各自的subscribeActual方法里也会对观察者对象进行一次装饰
ObservableObserveOn#subscribeActual

  @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

ObservableSubscribeOn#subscribeActual


 @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

最后返回对应的观察者对象ObserveOnObserver、SubscribeOnObserver

最后还有一点要注意的是:

  • Observable.subscribeOn():只能指定一次,如果指定多次则以第一次为准

  • Observable.observeOn():可指定多次,每次指定完都在下一步生效。

那为什么会这样呢?我们看到它们各种的subscribeActual方法里,observeOn是每次都会new一个ObserveOnObserver,ObserveOnObserver里都会传入一个observer和Scheduler.Worker,而subscribeOn只是调用自身的scheduler.scheduleDirect方法。

  • 结论:SubscribeOn这个操作符指定的是Observable自身在哪个调度器上执行,而且跟调用的位置没有关系。而ObservableOn则是指定一个观察者在哪个调度器上观察这个Observable,当每次调用了ObservableOn这个操作符时,之后都会在选择的调度器上进行观察,直到再次调用ObservableOn切换了调度器。

2、源码分析操作符Map的内部实现

Observable#map

   @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

经过上面分析,这段代码很熟悉了把,把this也就是Observable对象和mappr(我们自定义的函数fuction)传进去,最终肯定是返回一个ObservableMap被观察者对象的,我们再点进去ObservableMap看看,


public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
           if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
         ....省略部分代码
}

我们看到在subscribeActual方法里有一句代码source.subscribe(new MapObserver<T, U>(t, function));这句就是RxJava2.x所有操作符的核心代码了。
source就是我们上游传进来的被观察者,然后调用source.subscribe把function传进去,我们知道在被观察者subscribe完,肯定是走下面的onNext方法了,我们看到onNext里的一句关键代码mapper.apply(t),这句就是我们实际转变的地方,也就是我们在外层实现的apply方法。

在RxJava2.x中,所有操作符的核心原理,都是继承AbstractObservableWithUpstream,在subscribeActual方法的 source.subscribe里实现自己的核心原理,如果我们自己想自定义操作符的话其实也可以继承AbstractObservableWithUpstream然后实现subscribeActual来完成。(不过基本RxJava里的操作符都够用了)

相关文章

网友评论

    本文标题:Android RxJava2.x(二)线程调度和操作符源码分析

    本文链接:https://www.haomeiwen.com/subject/nretbhtx.html