美文网首页
Rxjava之Observable变换原理浅谈

Rxjava之Observable变换原理浅谈

作者: 曾逸111 | 来源:发表于2019-07-26 21:56 被阅读0次

大多数人都会使用Rxjava,但是使用要知其然,知其所以然,今天讲讲Rxjava的Observable变换原理。

举个栗子🌰

先看一段代码:

        Observable.create<Int> {
            it.onNext(1)
            it.onNext(2)
            it.onNext(3)
            it.onComplete()
        }
            .map{item->
                item * item }
            .flatMap { item-> ObservableSource<Int> {
                it.onNext(item * item)
                println(Thread.currentThread().name)
                it.onComplete()
            } }
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .subscribe(object : Observer<Int> {
                override fun onComplete() {
                    println("I'm done")
                }

                override fun onSubscribe(d: Disposable) {
                    println("I'm being Subscribe")
                }

                override fun onNext(t: Int) {
                    println(t)
                    println(Thread.currentThread().name)
                }

                override fun onError(e: Throwable) {
                    println(e.message)
                }
            })

这段代码,它从上游向下游发射数据1、2、3,然后经由中间map、flatmap、subscribeOn、observeOn等操作符变换后,对最后生成的Observable进行了订阅,传进Observer观察者,然后等待接收数据。当然,这里的flatMap,线程切换等的使用是不规范的,只是为了举个栗子🌰。

我们看下打印结果:

I'm being Subscribe
RxCachedThreadScheduler-1
RxCachedThreadScheduler-1
1
RxCachedThreadScheduler-1
RxCachedThreadScheduler-2
16
RxCachedThreadScheduler-2
81
RxCachedThreadScheduler-2
I'm done

看完上面的代码和执行结果,我们思考两个问题。
1、为什么调用了subscribe之后,上游就会被“触发”,从而向下发射数据?
2、为什么它可以向下发射数据,它的“调用链”是如何的又是如何产生的?
为了解答这两个问题,我们不得不去探究下Rxjava的源码实现了。

首先看Observable.create所返回的ObservableCreate

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

它持有一个ObservableOnSubscribe,可以注意到,当其subscribeActual被调用到的时候,ObservableOnSubscribe的subscribe就会被触发,即source.subscribe(parent),从这里开始就会调用到我们所实现的subscribe方法,从而开始调用onNext发射数据给下游。
那么,为什么subscribeActual会被调用到呢,其入参observer又是从哪儿来的呢?我们继续看下一个操作符的map的实现。

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
   final Function<? super T, ? extends U> function;

   public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
       super(source);
       this.function = function;
   }

   @Override
   public void subscribeActual(Observer<? super U> t) {
       source.subscribe(new MapObserver<T, U>(t, function));
   }

从这里我们就可以找到答案,我们注意到,这儿又有一个subscribeActual,而且同样调用了source.subscribe,并且传进了一个MapObserver,而这个source正是由上一步Observable.create传递进来的this;也就是说,这儿是通过“this”向上游去订阅了。我们继续看Observable 的subscribe的实现。

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            subscribeActual(observer);

我们发现,subscribe会调用到subscribeActual,传递observer订阅者,所以这里其实就会调用到了create操作符所产生的Observable的subscribeActual了。所以是从下游的subscribeActual向上游订阅,然后在上游的subscribeActual中继续再向上游订阅。

以此类推。我们栗子中最后调用的subscibe也是这样的逻辑。它会在subscribeActual中继续向上游订阅,并且传递其订阅者(观察者),其上游Observable也一样,继续向其上游订阅,直到最上游,然后在最上游的订阅方法中,执行业务逻辑,之后便开始向下游发射数据。

至于为什么可以向上游订阅,其实map的操作符已经给了答案了,在创建map这个Observable的时候,其上游就将自身“this”作为source传递给了它,于是,它能够在其subscribeActual中通过“this”向上游订阅。当然,不止有这种传递this来达到向上游订阅的目的,有的操作符可以给其下游传递一个自身的成员ObservableSource,同样可以达到被下游订阅的目的。

至于上游为什么可以向下游发射数据,也一目了然了,订阅过程中,下游一直在向上游传递观察者。
讲到这,前面提到的两个问题相信大家都已经清楚答案了。

这儿再给大家画了一张更加形象的Observable变换图,涉及到线程的切换,相信看完后会对Rxjava Observale的变换原理有更深的认识。(注:主线程中的绿色箭头有误,应该再最后一个箭头才变绿色(切换线程)的,这儿只是为了强调线程已切到主线程而已)


image.png

注:上面所述Rxjava变换原理并没有指定在特定的rxjava版本。其实rxjava1.0和rxjava2.0的变换原理都是一致的。故都适用。

好了,关于Rxjava之Observable变换原理就写到这儿了。
后面有时间再写写 Rxjava之线程变换原理浅谈

相关文章

网友评论

      本文标题:Rxjava之Observable变换原理浅谈

      本文链接:https://www.haomeiwen.com/subject/vsfjrctx.html