美文网首页
自己看代码写的RxJava2.0源码剖析乱七八糟的

自己看代码写的RxJava2.0源码剖析乱七八糟的

作者: Btwo | 来源:发表于2019-09-30 19:13 被阅读0次

    Rxjava是一个异步基于事件流的一个库,是复杂的逻辑保持整洁,响应式扩展。每家公司的安卓项目基本都有这个库的存在,所以平时有时间还是要去看看的实现

    基本的用法

    Rxjava不指定线程时

    在没有指定线程的情况下,程序运行起来,所有的方法执行都是在主线程执行的。Observerable,Observer,这两者是通过subscribe,联合起来的。如何建立起关联的?先看看Observerable,ObservableSource,ObservableCreate,ObservableEmitter,ObservableOnSubscribe,Observer这几个类的关系。用蹩脚的画图工具来理一理这几个类的关系。

    关键的几个类的UML图。

    RxJava相关的几个类的关系

    图是用Windows的画图工具画的,画得不专业,但是很有利于帮助自己理清思路,这里面还没有涉及到线程相关的类,下面会有相关的类,关于线程的。建议都可以去画一画,理一理思路。

    结合上面的图可以大概梳理Observable和observer是怎么通过subscribe()关联的,

    1.Observable.create(ObservableOnSubscribe source) 

        该方法创建了一个ObservableCreate对象。

    2.ObservableCreate调用subscribe(Observer observer)方法,实际上是调用了父类的Observable的subscribe的方法,转而调用Observable的抽象方法,subscribeActual(Observer observer),抽象方法被ObservableCreate实现,

    Observable的subscribe的方法实现

    3.ObservableCreate的subscribeActual(Observer )方法,创建了一个CreateEmitter事件发生器,然后调用observer的onSubscribe,把disposable抛到外层,业务层做处理,CreateEmitter,每发一个事件,首先判断这个-流(disposable)是否被切断了,再去发事件。

    ObservableCreate的subscribeActual(Observer )方法

    4.最后调用ObservableOnSubscribe的subscribe(CreateEmitter emitter),把事件发射器丢到了业务层,又业务控制发射的事件流。

        简单的RxJava使用时的源码剖析,其实很简单,接下来加入线程的情况去分析。

    Rxjava指定线程

    指定Rxjava subscribe时或者Observer的线程,涉及到的了几个类。ObservableSubscribeOn,ObservableObserveOn,和Schedulers,同样来看看指定线程时Rxjava的使用

    Rxjava指定线程

    这里主要拿RxJava IO的例子来分析,上面的例子每一步类的转变如下

    Rxjava异步各个类的转换

    左边可以看成是上游,右边是下游,左边是事件发生的,右边是接受事件的。在源码中可以看到有upstream和downstream,两者发生联动是方法subscribe触发的。

    1.调用subscribeOn的时候传入了Schedule.Io(IoScheduler)指定了subscribe执行的线程放在Io线程。新建了一个task放入线程池执行源头的suscribe方法,新建了两个类ObservableSubscribeOn和SubscribeOnObserver.

    2.map(funtion(t,r)) 该方法新建了两个类ObservableMap和MapObserver,ObservableMap承载了上游的数据(ObservableSubscribeOn)作为T,MapObserver包含了下游ObserveOnObserver

    3.ObservableObserveOn,指定了Observer观察者的方法所在的线程。下游执行到最底层的Observer会创建一个worker用来执行onNext和OnComplete等。

    RxJava几种线程池

    Schedulers.SINGLE

    拥有一个线程的单例,所有的任务都在这个线程执行,当前有任务在执行的时候,有新的任务来的时候会等待,先进先出。

    Schedulers.COMPUTATION

    适用于cpu密集型的计算任务,并不适用于IO

    Schedulers.IO

    Io密集型任务,支持异步阻塞IO

    Schedulers.TRAMPOLINE

    表示立即执行,当前有任务的话会暂停,先执行来的任务,执行完了再去执行暂停的任务

    Schedulers.NEW_THREAD

    每次都会新建一个线程去处理任务,

    理解Rxjava线程池异步的工作原理

    Subscribe指定线程原理

    要理解Rxjava线程的工作,理清楚几个类的职责,大体的思路基本都可以了


    Rxjava异步线程工作原理

    这里异步的方法举例IO线程的异步,其他的大体也是这个思路。

    1.subscribeOn(Schedulers.io())

    这个方法指定subscribe方法执行在哪个线程执行,当这个方法执行的时候,会新建一个SubscribeTask,把上游的对象source传进来,然后调用IoSchedulers的scheduleDirect(SubscribeTask)方法

    2.接着调用IoScheduler的createWorker(),这个方法会从CacheWorkerPool里面拿一个Worker出来,用着个woker来作为EventLoopWorker构造函数的参数,新建一个EventLoopWorker对象

    3.调用EventLoopWorker的schedule方法,实际上是调用CacheWorkerPool里面的worker对象的方法threadWorker.scheduleActual(action, delayTime, unit, tasks)

    4.ThreadWorker集成的是NewThreadWorker,NewThreadWorker里有一个线程池,庐山真面目已经露出来了ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);,

    5.最后调用的方法scheduleActual,里面实际是ScheduledExecutorService  executor的submit方法,或者是schedule方法,如果是指定了延迟时间就是schedule方法,否则就是前者。

    这就是大体的一个流程,结合上面的类图关系,基本上可以理清他们之间的工作关系了

    这里提出一个问题

    1.subscribeOn()是不是只有第一次有效?为什么说只有第一次有效?

    subscribeOn()

    为什么说只有IO线程器作用,而mainThread不起作用,?其实这种说法是错的,看了源码其实mainthread也是起作用的,mainThread的Task先执行了,在主线程调用了上游的ObservableSource 的suscribe的方法,而上游的ObservableSubscribeOn指定了Scheduler.IO线程,这时候就会调用这一层的Task的run方法,这里再调用上上层的suscribe方法,这样就只有第一次有效了。说起来好绕口,画个图吧。

    .subscribeOn()是不是只有第一次有效?为什么说只有第一次有效?  


    observe指定线程的原理

    这个相对来说就简单很多了,只要看一个类ObserveOnObserver

    这个类实现了runnable的方法,当接收到onNext的事件的时候。会调用worker.sheche(),把当前的ObserveOnObserver传进去,触发了run方法,然后在run方法中调用真正observer的onNext 和 onComplete,onError等方法。达到了再特定线程监听。

    相关文章

      网友评论

          本文标题:自己看代码写的RxJava2.0源码剖析乱七八糟的

          本文链接:https://www.haomeiwen.com/subject/fwddkctx.html