美文网首页
RxJava分析(一)

RxJava分析(一)

作者: 人称老黄 | 来源:发表于2021-08-08 18:44 被阅读0次

    Rxjava的思想

        有一个起点和终点,在数据的流向过程中,可用增加数据的拦截 拦截时可以对数据发生改变,终点只关心它上一个拦截

    Rxjava的使用:

    dependencies{

    implementation'io.reactivex.rxjava2:rxandroid:2.0.1'

    implementation'io.reactivex.rxjava2:rxjava:2.1.6'

    }

    1.rxjava设计模式

        rxjava准确来说不是标准的观察模式,在标准的观察者模式中是一个被观察者对应多个观察者,而在rxjava是多个被观察者对应一个观察者

         为什么是个被观察者对应一个观察者? 因为我们Observable.create是一个被观察者,map操作符也是被观察者,最后我们只有一个终点,所以是多个被观察者对应一个观察者

    2.rxjava原理分析:主要对以下源码进行分析,其他操作符号原理也是差不多的,如果觉得new Observer太麻烦了 可以实现new Consumer 这个然后实现accept方法

    图1

    (1) RxJavaPlugins.onAssembly:默认情况下是返回source,如果需要监听Rx的操作符,可以调用setOnObservableAssembly方法,然后调用RxJavaPlugins.setOnObservableAssembly方法

    (1) Create操作符:创建了一个ObservableCreate对象,并且把source传给了ObservableCreate对象,然后在ObservableCreate构造方法里面保存了source

    (2) map操符号:创建一个ObservableMap对象传入一个this和mapper,这里的this的实例就是ObservableCreate,我们可以看作是ObservableCreate调用map方法  mapper就是我们自定义的Function,然后ObservableMap里面也是保存了source和Function

     (3) subscribe操作符:直接看核心代码subscribeActual方法,subscribeActual是一个抽象方法,但是我们是通过 ObservableMap调用出来的,我们直接看ObservableMap的subscribeActual方法

    图4

           调用的是source.subscribe(new MapObserver(t, function)); 这个source是什么呢?这个source就是ObservableCreate,因为在ObservableMap的是我们传入了一个this和Function,所以这里就开始调用了ObservableCreate.subscribe(new MapObserver)方法ObservableCreate并没有实现subscribe方法,但是它的父类实现了subscribe方法所以也是调用了ObservableCreate.subscribeActual方法并且把MapObserver传给了ObservableCreate.subscribeActual方法

     (4) ObservableCreate.subscribeActual:拿到了MapObserver传给了发射器CreateEmitter,CreateEmitter可以看作Observer的包装类,然后回调了observer.onSubscribe, MapObserver并没有实现onSubscribe方法,但是父类BasicFuseableObserver实现了Observer接口并且调用了actual.onSubscribe(this)   这个actual又是什么东西?  这个actual是在new MapObserver(t, function)传入的,这个t在我们调用subscribe操作符号的时候传入的new Observer,然后回调observer.onSubscribe方法;接着调用source.subscribe方法,这个source指的是create传入的new ObservableOnSubscribe

     (5) ObservableEmitter.onNext方法:当我们调用onNext方法时候也就是调用ObservableEmitter.onNext方法,ObservableEmitter的onNext方法比较简单,先是observer.onNext(t);那么这个observer又是谁呢?通过ObservableEmitter构造方法得知传入了一个observer,这个observer就是ObservableMap的MapObserver

    (5) MapObserver.onNext方法:直接看核心代码mapper.apply(t),这个mapper又是什么呢?回到我们的第二步是不是传入一个mapper,这个mapper就是Function,然后又将Function传给了这个MapObserver,所以就将这个转换后的数据apply过去.然后接着调用downstream.onNext(v)方法,这个downstream又是啥呢?这个downstream回到图4,在构造方法传入了一个t,这个t就是我们的观察者new Observer 所以downstream.onNext方法就会回调了观察者的onNext方法

    为了方便理解绘画了一份思维导图

    总结:rxjava的结构是一种u形的结构,主要是subscribe开始的,先是调用的ObservableMap.subscribeActual方法,以打包的形式new MapObserver(t, function) 回调了ObservableCreate.subscribeActual,然后在ObservableCreate的subscribeActual方法里面进行了一个拆包,拆包是从最外面开始拆的也就说调用了MapObserver.onNext方法进行数据的回调.

    1.创建一个ObservableCreate对象 ,并且内存存储了同一个自定义source

    2.调用map操作符,内部创建一个ObservableMap对象,内存存储了一个source和function ,这个source指的就是ObservableCreate,那是应该我们通过ObservableCreate调用map操作符

    3.subscribe方法会传入一个Observer观察者,然后调用subscribeActual方法subscribeActual是一个抽象方法 但是ObservableMap.实现了subscribeActual方法

    4.ObservableMap.subscribeActual调用source.subscribe(MapObserver()); 这个source就是ObservableCreate 也就是说调用了ObservableCreate.subscribeActual方法

    5.ObservableCreate.subscribeActual内部先创建一个CreateEmitter发射器,这个发射器相当于将MapObserver的包装类,然后调用observer.onSubscribe()方法 然后就回调了onSubscribe方法,接着调用source.subscribe(CreateEmitter),因为这个source 是在ObservableCreate的时候传入的一个自定义source,所以就回调了subscribe方法

    5.ObservableEmitter.onNext方法 调用发射器的onNext ,内部也有一个observer观察者,然后调用 MapObserver.onNext ,然后回到observer.onNext方法,它会调用mapper.apply()方法将数据转换需要的类型,然后回调onNext方法,并且返回数据

    相关文章

      网友评论

          本文标题:RxJava分析(一)

          本文链接:https://www.haomeiwen.com/subject/vkyrvltx.html