优先参考:给初学者的RxJava2.0教程 1-10系列
RxJava 3.x系列(二)操作符 & 背压(BackPressure)
一、RxJava简介
-
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
2012年启动,2013.2发布距今已7年,在Java和Android异步事件处理、数据过滤与变换提供函数响应式编程能力.
缘由:Netflix为增加服务器性能和吞吐量编写RxJava并开源,主要开发维护人员:David Karnok、Ben Christensen、Shixiong Zhu、Jake Wharton...
-
函数响应式编程:函数式编程+响应式编程,面向数据流和变化传播的编程范式(领域中主流的行事套路、规范化的关系);
-
适用场景以及与AsyncTask、Handler、Executor适用场景异同:复杂网络数据流请求以及与操作符对于的数据变换处理,同时包含对OkHttp、Retrofit、Gson支持;
-
与RxAndroid关系:RxJava针对Android平台的扩展,为Android开发提供响应式扩展组件;
-
与Kotlin协程对比(待进一步分析):RxJava基于Rx特性封装的Java函数库,协程是Kotlin编译器级别概念,理解为基于编译器or虚拟机设计的轻量级线程。两者都有数据变换、异步数据流处理能力;
-
RxKotlin:适配Kotlin的ReactiveX版本;
-
内存占用、代码混淆、包体积(待进一步分析):内存占用RxJava > 协程,回收RxJava创建的对象,需给予更高的 CPU占用,也会转变为更高的电量损耗.
二、RxJava第一行代码与Observable订阅过程
- RxJava第一行代码
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
});
- Obserbable订阅过程
1.整体流程:上游创建、下游创建、订阅
2.Observable create后subscribe下游Observer,使得Observable触发subscribeActual()方法
3.ObservableCreate类中subscribeActual()方法通过ObservableOnSubscribe类的subscribe()方法订阅CreateEmitter
4.Observer作为成员变量暴露给ObservableEmitter,ObservableEmitter调用onNext()、onError()、onComplete()将回调下游Observer的onNext()、onError()、onComplete()方法
注:RxJava基于观察者模式,而观察者模式本质是回调。上游订阅下游实际表现为:CreateEmitter传入Observer作为其成员变量,CreateEmitter调用onNext()发射数据,将立即回调Observer的onNext()方法
-
RxJava第一行代码类图及各方法关系如下:
RxJava_Observable_class.png
三、线程切换过程
...
emitter.onNext(3);
emitter.onComplete();
}
}) .subscribeOn(Schedulers.io()) // 切换上游线程
.observeOn(AndroidSchedulers.mainThread()) // 切换下游线程
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);
}
...
原理:RxJava提供Scheduler
作为线程调度器,底层通过封装Handler
和Executor
实现线程切换
-
上游线程切换subscribeOn(Schedulers.newThread())
subscribeOn()调用后,生成ObservableSubscribeOn对象,并将原有Observable作为成员变量传入 subscribeActual()方法中scheduler.scheduleDirect(new SubscribeTask(parent)) //实现线程切换; SubscribeTask为Runable对象,run()方法执行source.subscribe(parent); //上游随即切换到对应线程 scheduleDirect(Runnable run)方法内部创建了Worker对象,通过调用worker.shedule(Runnable, long delay, TimeUnit)实现真正线程切换,worker内部包含线程池等切线程操作
-
下游线程切换observeOn(AndroidSchedulers.mainThread())
observeOn()调用后,生成ObservableObserveOn对象,并将原有Observable作为成员变量传入 subscribeActual()方法中通过Scheduler.Worker w = scheduler.createWorker()创建Worker对象 source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize)) 完成上下游订阅(给下游传入了Worker对象) 上游发射数据后,即ObserveOnObserver的onNext()方法被调用,导致worker.schedule(this)执行 ObserveOnObserver即上述方法中的this,它实现了Runnable接口,其run()方法运行在指定线程,run()中也会调用下游onNext()方法,下游开始执行,故下游即被切换到对应线程 注:下游线程切换后,由于上下游线程不同,上游发射数据后会将数据发射到消息队列,ObserveOnObserver拥有消息发送队列,被观察者发送消息即入队,下游从消息队列中取数据。消息队列queue是ObserveOnObserver的成员变量,queue创建于ObserveOnObserver的onSubscribe()方法,queue = new SpscLinkedArrayQueue<>(bufferSize)
-
注:上下游同线程,上游发射一次onNext(),下游onNext随即被调用一次,因为属于同步调用;上下游不同线程,属于异步调用,上游发射onNext()会进入消息队列,下游会从消息队列取数据回调其onNext(),故上下游调用顺序不能保证。
-
subscribeOn()
切换线程时序图 注:红色方框内为重点切线程操作
subscribeOn_sequence.png
-
observeOn()
切换线程时序图
observeOn_sequence.png
参考:
ReactiveX - Operators
ReactiveX/RxJava文档中文翻译
Github-ReactiveX-RxJava
详解RxJava消息订阅和线程切换原理
Rxjava线程变换原理浅谈
P.S. 本文源码基于RxJava 3.0.4
结尾时序图引用自微信公众号
By the way: 非常感谢能提出批评和修改意见
contact: tu.wentai@nexuslink.cn
WeChat:HUC-Kris
网友评论