美文网首页
64.RxJava线程切换

64.RxJava线程切换

作者: SlideException | 来源:发表于2020-08-05 16:54 被阅读0次

https://www.jianshu.com/p/3dd582bb10cc
https://www.jianshu.com/p/88aa273d37be

Observable 可观察的 被观察者
Observer 观察者
Subscribe 订阅

在RxJava中,可以非常方便的实现线程间切换,subscribeOn(Schedulers.io())用于指定上游线程,observeOn(AndroidSchedulers.mainThread())用于指定下游线程,多次调用subscribeOn指定上游线程只有一次有效,多次调用observeOn指定下游线程,每次都有效,比直接使用Handler省力不少,也不用担心内存泄漏的问题。


image.png

图中可以看出具体实现类只有ObservableCreate和CreateEmitter,CreateEmitter是ObservableCreate的内部类,
Observable mObservable = new ObservableCreate(new ObservableOnSubscribe());
当观察者订阅主题后,
mObservavble.subscribe(mObserver);
ObservableCreate中的subscribeActual()方法就会执行,该方法中会创建CreateEmitter实例。。。
subscribeActual()是实现上下游之间订阅关系的重要方法

subscribeOn方法会创建ObservableSubscribeOn对象,构造方法中传入自己和scheduler,ObservableSubscribeOn是Observable的一个子类,实现了HasUpstreamObservableSource接口,接口中的source()方法返回类型是ObservableSource,也就是说ObservableSubscribeOn这个Observable是一个拥有上游的Observable,source就代表了他的上游,ObservableSubscribeOn里的SubscribeTask 是实现了Runnbale接口的,这个run方法要执行的内容就是实现ObservableSubscribeOn的上游和Observer的订阅,上游事件是怎么给弄到子线程里去的?就是直接把订阅方法放在了一个Runnable中去执行,这样就一旦这个Runnable在某个子线程执行,那么上游所有事件只能在这个子线程中执行了。

Worker类是Scheduler内部的一个静态抽象类,实现了Disposable接口,
newThread() 方法经过层层委托处理,最终我们需要的就是一个NewThreadScheduler的实例。
NewThreadScheduler 是Scheduler的一个子类,在他的静态代码块中构造了一个Priority=5的线程工厂。而在我们最最关注的createWorker()方法中他又用这个线程工厂创建了一个NewThreadWorker 的实例。NewThreadWorker构造函数中,通过NewThreadScheduler中提供的线程工厂threadFactory创建了一个ScheduledExecutorService。用Executors,创建了一个核心线程为1的线程。子线程是谁如何创建的?在NewThreadScheduler的createWorker()方法中,通过其构建好的线程工厂,在Worker实现类的构造函数中创建了一个ScheduledExecutorService的实例,是通过SchedulerPoolFactory创建的。

ObservableSubscribeOn,他是subscribeOn 返回的Observable对象,他持有一个Scheduler 实例的引用,而这个Scheduler实例就是NewThreadScheduler(即Schedulers.newThreade())的一个实例。ObservableSubscribeOn 的subscribeActual方法,会触发NewThreadScheduler去执行SubscribeTask中定义的任务,而这个具体的任务又将由Worker类创建的子线程去执行。这样就把上游事件放到了一个子线程中实现。

多次用 subscribeOn 指定上游线程为什么只有第一次有效?
因为上游Observable只有一个任务,就是subscribe(准确的来说是subscribeActual()),而subscribeOn 要做的事情就是把上游任务切换到一个指定线程里,那么一旦被切换到了某个指定的线程里,后面的切换不就是没有意义了吗。

subscribeOn完成了以下操作:
1.创建了一个 ObservableSubscribeOn 对象,本质上来说他就是一个Observable,他同时实现了 AbstractObservableWithUpstream(HasUpstreamObservableSource )这样一个接口,是他变了一个拥有上游的Observeable。
2.在 ObservableSubscribeOn 的 subscribeActual 方法中
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
将真正的 subscribe 操作安置在了SubscribeTask这样个一个Runnable当中,这个 Runnable 将由scheduler 这个调度器负责启动,因此就把上游操作放到了 scheduler 所在的线程中。

Schedulers.newThread()或者Schedulers.io() 都是通过工厂方法的模式创建了某种指定类型的线程, 当这个特定的线程执行是,就是执行真实的 subscribe 方法,这样就把上游操作放到了一个特定的线程中去执行。

observeOn(AndroidSchedulers.mainThread())
AndroidSchedulers 并不是 RxJava 的一部分,而是RxAndroid框架里的需单独在gradle中配置,
当调用AndroidSchedulers.mainThread() 时,返回了一个HandlerScheduler 的实例,而这个实例使用到了我们非常熟悉的 Handler。
HandlerScheduler 是一个 Scheduler ,通过构造函数他获取到了主线程所在的 Handler实例。而在他的 createWorker() 方法中,他又通过这个 Handler 实例创建了一个HandlerWorker 的实例,这个HandlerWorker 本质上就是一个 Worker。在他的 schedule 方法中,创建了一个 ScheduleRunnable 对象,并会把这个Runnable对象通过 handler 的 sendMessageDelayed 方法发送出去,而我们知道这个 Handler 是主线程,这样在下游中,就把任务从某个子线程转移到了UI线程。

相关文章

  • 64.RxJava线程切换

    https://www.jianshu.com/p/3dd582bb10cchttps://www.jianshu...

  • Rxjava2 Note

    线程切换 subscribeOn - 切换上游线程,但仅调用的第一次生效 observeOn - 切换下游线程,调...

  • RxJava系列(三)--切换线程

    1.主线程是怎么切换到子线程2.为什么只有第一次切换有效3.子线程是怎么切换到主线程1>问题1,主线程是怎么切换到...

  • subscribeOn 谐音上 就是上面的

    subscribeOn的调用切换之前的线程。 observeOn的调用切换之后的线程。 observeOn之后,不...

  • 2020-12-07

    1、进程线程协程2、进程切换、线程切换https://www.cnblogs.com/xiangshihua/p/...

  • rxjava源码解析

    线程切换原理 案例 subscribeOn切换子线程 先看subscribe的执行,最后会执行Observable...

  • 一种估计服务器线程个数的方法

    1.估计线程数存在的难点 线程个数越多,线程切换频繁,会浪费资源在线程切换上面;线程个数少无法充分利用cpu资源,...

  • 进程和线程的对比

    问题 一 进程和线程的对比 二 为什么进程切换开销大,线程切换开销低呢? .......................

  • android协程

    Kotlin的协程,本质上是一个线程框架,它可以方便的切换线程的上下文(如主线程切换到子线程/子线程切回主线程)。...

  • RxJava源码分析(四)线程切换observeOn

    引言 前面的文章我们走完了订阅方法线程切换的实现,今天我们来看观察方法的线程切换。 线程调度observeOn 接...

网友评论

      本文标题:64.RxJava线程切换

      本文链接:https://www.haomeiwen.com/subject/zjusrktx.html