https://www.jianshu.com/p/3dd582bb10cc
https://www.jianshu.com/p/88aa273d37be
Observable 可观察的 被观察者
Observer 观察者
Subscribe 订阅
在RxJava中,可以非常方便的实现线程间切换,subscribeOn(Schedulers.io())用于指定上游线程,observeOn(AndroidSchedulers.mainThread())用于指定下游线程,多次调用subscribeOn指定上游线程只有一次有效,多次调用observeOn指定下游线程,每次都有效,比直接使用Handler省力不少,也不用担心内存泄漏的问题。

图中可以看出具体实现类只有ObservableCreate和CreateEmitter,CreateEmitter是ObservableCreate的内部类,
Observable mObservable = new ObservableCreate(new ObservableOnSubscribe());
当观察者订阅主题后,
mObservavble.subscribe(mObserver);
ObservableCreate中的subscribeActual()方法就会执行,该方法中会创建CreateEmitter实例。。。
subscribeActual()是实现上下游之间订阅关系的重要方法
subscribeOn方法会创建ObservableSubscribeOn对象,构造方法中传入自己和scheduler,ObservableSubscribeOn是Observable的一个子类,实现了HasUpstreamObservableSource接口,接口中的source()方法返回类型是ObservableSource,也就是说ObservableSubscribeOn这个Observable是一个拥有上游的Observable,source就代表了他的上游,ObservableSubscribeOn里的SubscribeTask 是实现了Runnbale接口的,这个run方法要执行的内容就是实现ObservableSubscribeOn的上游和Observer的订阅,上游事件是怎么给弄到子线程里去的?就是直接把订阅方法放在了一个Runnable中去执行,这样就一旦这个Runnable在某个子线程执行,那么上游所有事件只能在这个子线程中执行了。
Worker类是Scheduler内部的一个静态抽象类,实现了Disposable接口,
newThread() 方法经过层层委托处理,最终我们需要的就是一个NewThreadScheduler的实例。
NewThreadScheduler 是Scheduler的一个子类,在他的静态代码块中构造了一个Priority=5的线程工厂。而在我们最最关注的createWorker()方法中他又用这个线程工厂创建了一个NewThreadWorker 的实例。NewThreadWorker构造函数中,通过NewThreadScheduler中提供的线程工厂threadFactory创建了一个ScheduledExecutorService。用Executors,创建了一个核心线程为1的线程。子线程是谁如何创建的?在NewThreadScheduler的createWorker()方法中,通过其构建好的线程工厂,在Worker实现类的构造函数中创建了一个ScheduledExecutorService的实例,是通过SchedulerPoolFactory创建的。
ObservableSubscribeOn,他是subscribeOn 返回的Observable对象,他持有一个Scheduler 实例的引用,而这个Scheduler实例就是NewThreadScheduler(即Schedulers.newThreade())的一个实例。ObservableSubscribeOn 的subscribeActual方法,会触发NewThreadScheduler去执行SubscribeTask中定义的任务,而这个具体的任务又将由Worker类创建的子线程去执行。这样就把上游事件放到了一个子线程中实现。
多次用 subscribeOn 指定上游线程为什么只有第一次有效?
因为上游Observable只有一个任务,就是subscribe(准确的来说是subscribeActual()),而subscribeOn 要做的事情就是把上游任务切换到一个指定线程里,那么一旦被切换到了某个指定的线程里,后面的切换不就是没有意义了吗。
subscribeOn完成了以下操作:
1.创建了一个 ObservableSubscribeOn 对象,本质上来说他就是一个Observable,他同时实现了 AbstractObservableWithUpstream(HasUpstreamObservableSource )这样一个接口,是他变了一个拥有上游的Observeable。
2.在 ObservableSubscribeOn 的 subscribeActual 方法中
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
将真正的 subscribe 操作安置在了SubscribeTask这样个一个Runnable当中,这个 Runnable 将由scheduler 这个调度器负责启动,因此就把上游操作放到了 scheduler 所在的线程中。
Schedulers.newThread()或者Schedulers.io() 都是通过工厂方法的模式创建了某种指定类型的线程, 当这个特定的线程执行是,就是执行真实的 subscribe 方法,这样就把上游操作放到了一个特定的线程中去执行。
observeOn(AndroidSchedulers.mainThread())
AndroidSchedulers 并不是 RxJava 的一部分,而是RxAndroid框架里的需单独在gradle中配置,
当调用AndroidSchedulers.mainThread() 时,返回了一个HandlerScheduler 的实例,而这个实例使用到了我们非常熟悉的 Handler。
HandlerScheduler 是一个 Scheduler ,通过构造函数他获取到了主线程所在的 Handler实例。而在他的 createWorker() 方法中,他又通过这个 Handler 实例创建了一个HandlerWorker 的实例,这个HandlerWorker 本质上就是一个 Worker。在他的 schedule 方法中,创建了一个 ScheduleRunnable 对象,并会把这个Runnable对象通过 handler 的 sendMessageDelayed 方法发送出去,而我们知道这个 Handler 是主线程,这样在下游中,就把任务从某个子线程转移到了UI线程。
网友评论