Rxjava是一个异步基于事件流的一个库,是复杂的逻辑保持整洁,响应式扩展。每家公司的安卓项目基本都有这个库的存在,所以平时有时间还是要去看看的实现
基本的用法
Rxjava不指定线程时在没有指定线程的情况下,程序运行起来,所有的方法执行都是在主线程执行的。Observerable,Observer,这两者是通过subscribe,联合起来的。如何建立起关联的?先看看Observerable,ObservableSource,ObservableCreate,ObservableEmitter,ObservableOnSubscribe,Observer这几个类的关系。用蹩脚的画图工具来理一理这几个类的关系。
关键的几个类的UML图。
RxJava相关的几个类的关系图是用Windows的画图工具画的,画得不专业,但是很有利于帮助自己理清思路,这里面还没有涉及到线程相关的类,下面会有相关的类,关于线程的。建议都可以去画一画,理一理思路。
结合上面的图可以大概梳理Observable和observer是怎么通过subscribe()关联的,
1.Observable.create(ObservableOnSubscribe source)
该方法创建了一个ObservableCreate对象。
2.ObservableCreate调用subscribe(Observer observer)方法,实际上是调用了父类的Observable的subscribe的方法,转而调用Observable的抽象方法,subscribeActual(Observer observer),抽象方法被ObservableCreate实现,
Observable的subscribe的方法实现3.ObservableCreate的subscribeActual(Observer )方法,创建了一个CreateEmitter事件发生器,然后调用observer的onSubscribe,把disposable抛到外层,业务层做处理,CreateEmitter,每发一个事件,首先判断这个-流(disposable)是否被切断了,再去发事件。
ObservableCreate的subscribeActual(Observer )方法4.最后调用ObservableOnSubscribe的subscribe(CreateEmitter emitter),把事件发射器丢到了业务层,又业务控制发射的事件流。
简单的RxJava使用时的源码剖析,其实很简单,接下来加入线程的情况去分析。
Rxjava指定线程
指定Rxjava subscribe时或者Observer的线程,涉及到的了几个类。ObservableSubscribeOn,ObservableObserveOn,和Schedulers,同样来看看指定线程时Rxjava的使用
Rxjava指定线程这里主要拿RxJava IO的例子来分析,上面的例子每一步类的转变如下
Rxjava异步各个类的转换左边可以看成是上游,右边是下游,左边是事件发生的,右边是接受事件的。在源码中可以看到有upstream和downstream,两者发生联动是方法subscribe触发的。
1.调用subscribeOn的时候传入了Schedule.Io(IoScheduler)指定了subscribe执行的线程放在Io线程。新建了一个task放入线程池执行源头的suscribe方法,新建了两个类ObservableSubscribeOn和SubscribeOnObserver.
2.map(funtion(t,r)) 该方法新建了两个类ObservableMap和MapObserver,ObservableMap承载了上游的数据(ObservableSubscribeOn)作为T,MapObserver包含了下游ObserveOnObserver
3.ObservableObserveOn,指定了Observer观察者的方法所在的线程。下游执行到最底层的Observer会创建一个worker用来执行onNext和OnComplete等。
RxJava几种线程池
Schedulers.SINGLE
拥有一个线程的单例,所有的任务都在这个线程执行,当前有任务在执行的时候,有新的任务来的时候会等待,先进先出。
Schedulers.COMPUTATION
适用于cpu密集型的计算任务,并不适用于IO
Schedulers.IO
Io密集型任务,支持异步阻塞IO
Schedulers.TRAMPOLINE
表示立即执行,当前有任务的话会暂停,先执行来的任务,执行完了再去执行暂停的任务
Schedulers.NEW_THREAD
每次都会新建一个线程去处理任务,
理解Rxjava线程池异步的工作原理
Subscribe指定线程原理
要理解Rxjava线程的工作,理清楚几个类的职责,大体的思路基本都可以了
这里异步的方法举例IO线程的异步,其他的大体也是这个思路。
1.subscribeOn(Schedulers.io())
这个方法指定subscribe方法执行在哪个线程执行,当这个方法执行的时候,会新建一个SubscribeTask,把上游的对象source传进来,然后调用IoSchedulers的scheduleDirect(SubscribeTask)方法
2.接着调用IoScheduler的createWorker(),这个方法会从CacheWorkerPool里面拿一个Worker出来,用着个woker来作为EventLoopWorker构造函数的参数,新建一个EventLoopWorker对象
3.调用EventLoopWorker的schedule方法,实际上是调用CacheWorkerPool里面的worker对象的方法threadWorker.scheduleActual(action, delayTime, unit, tasks)
4.ThreadWorker集成的是NewThreadWorker,NewThreadWorker里有一个线程池,庐山真面目已经露出来了ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);,
5.最后调用的方法scheduleActual,里面实际是ScheduledExecutorService executor的submit方法,或者是schedule方法,如果是指定了延迟时间就是schedule方法,否则就是前者。
这就是大体的一个流程,结合上面的类图关系,基本上可以理清他们之间的工作关系了
这里提出一个问题
1.subscribeOn()是不是只有第一次有效?为什么说只有第一次有效?
subscribeOn()为什么说只有IO线程器作用,而mainThread不起作用,?其实这种说法是错的,看了源码其实mainthread也是起作用的,mainThread的Task先执行了,在主线程调用了上游的ObservableSource 的suscribe的方法,而上游的ObservableSubscribeOn指定了Scheduler.IO线程,这时候就会调用这一层的Task的run方法,这里再调用上上层的suscribe方法,这样就只有第一次有效了。说起来好绕口,画个图吧。
.subscribeOn()是不是只有第一次有效?为什么说只有第一次有效?
observe指定线程的原理
这个相对来说就简单很多了,只要看一个类ObserveOnObserver
这个类实现了runnable的方法,当接收到onNext的事件的时候。会调用worker.sheche(),把当前的ObserveOnObserver传进去,触发了run方法,然后在run方法中调用真正observer的onNext 和 onComplete,onError等方法。达到了再特定线程监听。
网友评论