本文试图解释以下问题:
1,RxJava的基本使用方式
2,create、map、subscribe等操作到底做了什么
3,RxJava处理多线程并发的实现原理,和解答各种疑问
0, Rxjava介绍
Rxjava通过结合函数式及响应式优点用以解决传统的面对对象编程处理异步式事务存在的繁琐冗重问题。
1,RxJava的基本使用方式
写个demo,方便我们跟源码。首先在我们gradle工程中添加依赖:
implementation 'io.reactivex.rxjava3:rxjava:3.0.2'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
最新版本可以通过RxAndroid开源库查询。
demo类如下:
public class RxBuilder {
private static String TAG = "RxBuilder";
//被观察者
private Observable<String> mObservable;
private RxBuilder() {
mObservable = Observable
.create(new ObservableOnSubscribe<String>() { //事件源头
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
Log.d(TAG,this.getClass().getName() + " : subscribe");
emitter.onNext("from_ObservableOnSubscribe");
}
})
.map(new Function<String, String>() { //有时我们会需要使用操作符进行变换
@Override
public String apply(String s) throws Throwable {
Log.d(TAG,this.getClass().getName() + " : apply");
return s;
}
});
//.subscribeOn(Schedulers.io()) //指定事件源代码执行的线程
//.observeOn(AndroidSchedulers.mainThread()); //指定订阅者代码执行的线程
//参数是我们创建的一个观察者,绑定被观察者
mObservable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG,this.getClass().getName() + " : onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.d(TAG,this.getClass().getName() + " : onNext: " + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG,this.getClass().getName() + " : onError");
}
@Override
public void onComplete() {
Log.d(TAG,this.getClass().getName() + " : onComplete");
}
});
}
public static synchronized RxBuilder getInstance() {
return InstantHolder.mInstant;
}
private static class InstantHolder {
private static RxBuilder mInstant = new RxBuilder();
}
}
打印如下:
D/RxBuilder: com.example.activitytest.rxjava.RxBuilder$3 : onSubscribe
D/RxBuilder: com.example.activitytest.rxjava.RxBuilder$2 : subscribe
D/RxBuilder: com.example.activitytest.rxjava.RxBuilder$1 : apply
D/RxBuilder: com.example.activitytest.rxjava.RxBuilder$3 : onNext: from_ObservableOnSubscribe
面对demo和打印结果,我有以下疑问:
1,apply是怎么被调用到的?
2,为什么我们需要发射器ObservableEmitter,而不是直接拿注册的Observer调用其消息接受函数就完事?
下面就给出调用流程,理解了流程,自然就有了答案。
2,create、map、subscribe等操作到底做了什么
调用流程图示如下:
RxJava调用流程
在调用subscribe注册观察者之前,每次使用操作符(map、subscribeOn等)会创建新的Observable子类,并把传入的上层Observable子类作为成员变量保存起来。
结构就像是欧罗斯套娃!
Observable和Observer都是套娃
下面详细描述RxJava调用流程图:
①最下层的Observable子类调用subscribe注册观察者后,会不断递归调用上层Observable实例的subscribe方法,将下层传入的Observer实例封装成自己的Observer实例。
又是一个欧罗斯套娃。以ObservableMap为例:
@Override
public void subscribeActual(Observer<? super U> t) {
//source就是上一层的Observable实例
//给Observer封装一个Function再传给上一层Observable
//MapObserver封装使用了装饰模式,之后每次给观察者发消息,都会先调用入参的Function
source.subscribe(new MapObserver<T, U>(t, function));
}
②通过步骤一,来到了ObservableCreate实例,并调用Observer的onSubscribe方法。需要注意的是,此时的Observer早已不是那个我们自定义的清纯少年,而是经历了层层Observable包装后历经风尘的情场老手了:
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
//发射器封装了经过层层包装的Observer
observer.onSubscribe(parent);
try {
//这里是第三步,调用事件源ObservableOnSubscribe的subscribe方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
传入Disposable的子类CreateEmitter,可以调用Disposable.dispose()方法取消订阅。
我们来看这个情场老手Observer是怎么将他的风尘往事娓娓道来的:
//MapOptionalObserver基类BasicFuseableObserver的onSubscribe方法
public final void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
if (beforeDownstream()) {
//downstream就是下层的Observer!开始套娃
downstream.onSubscribe(this);
afterDownstream();
}
}
}
所以,Observer是从上往下一层层调用包装类的onSubscribe,直到我们自定义的Observer,那个清纯少年。
③调用事件源的subscribe方法,的见上方代码“source.subscribe(parent)”。传给事件源的发射器,实现类是也是上面Disposable的子类CreateEmitter。发射器Emitter封装了Observer。封装的,是那个情场老手Observer。
④事件源中用发射器给观察者Observer发onNext消息,开始真正的响应式事件传递。
⑤前面提到,发射器Emitter中封装了情场老手Observer,所以通过发射器调用Observer的方法,如onNext,也会像剥洋葱一样递归地一层一层调用被包装的Observer.onNext,调完一层就拿掉一层套娃,再去调下一层封装的Observer.onNext,直到调到我们自定义的Observer。
所以在ObservableMap中增加了Function形成的Observer封装,在接受onNext消息时就会先于我们注册时自定义的Observer被调用,将上层Observer.onNext(T)传入的T类型转换为新的U类型传入下层的Observer。而且Function中的apply是先于下层的onNext被调用的:
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
//这里会调用Function.apply !! 将上层Observer.onNext(T)传入的T类型转换为新的U类型
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//调用下一层Observer封装的onNext
downstream.onNext(v);
}
补充内容:两种观察者模式的创建
Observable / Observer
Observable 是RxJava描述的事件流,在链式调用中非常清晰,事件从创建到加工处理再到被订阅者接收到,就是一个接一个的Observable形成的一个事件流。
同样是链式调用,但它与我们常见的Builder模式不太一样,每个操作符,每次线程切换,每步都会新建一个Observable而非直接加工上一步的Observable返回给下一步。在源码中不同的加工会创建不同的Observable,比如map()会创建一个ObservableMap,subscribeOn()会创建一个ObservableSubscribeOn,但它们实际上都是Observable的子类。
Observable可以通过Observable.create方法获取,或通过Retrofit.create方法获取。
Flowable / Subscriber
与Observable / Observer相比,最大区别在于可以支持背压(Backpressure)。背压是指观察者可以动态调节被观察者发送消息的速度,以避免消息发送速度持续大于消息处理速度,导致消息缓存爆掉。有点像CPU的睿频。
Flowable的创建是和Observable一样,可以通过操作函数,例如:create(),defer(),just(),from(),rang(),timer(),interval()等方法来创建。
也可以直接通过Observable转换创建:
Observable.toFlowable()。
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {
int i = 0;
while (true){
e.onNext("data:"+(i++));
}
}
}, BackpressureStrategy.DROP)//超出缓冲池的数据丢弃
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() { //绑定Subscriber
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(1);
}
@Override
public void onNext(String s) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
printThread(s);
subscription.request(1); //处理完了,在请求数据
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
3,多线程和并发
demo中被注释的两行,就是将Observable和Observer置于不同线程的魔法:
observable.subscribeOn(Schedulers.newThread()) //设置被观察者线程
observeOn(AndroidSchedulers.mainThread()) //设置观察者线程
如果保持被注释状态,那么现在被观察者和观察者默认是在同一个线程工作的。
RxJava 线程切换原理
RxJava线程切换的核心原理,就是将Observer和Observable实现Runnable接口,并将run方法扔到对应的Thread中执行。
带着问题去看源码:
1,observeOn和subscribeOn到底将那些代码放到了run方法中?也就是说,到底哪些代码是可以切换线程的?
2,observeOn和subscribeOn怎么影响其他操作符封装的Observer和Observable?
3,observeOn和subscribeOn多次调用,效果是怎么样的?
这里先说一些重要的结论:
1,observeOn会将下层Observer的onNext放入指定线程运行;subscribeOn会将上层Observable的subscribe放入指定线程运行。
2,因为套娃,所以Observable可以知道上层Observable,并且subscribe的调用顺序是自下而上;Observer可以知道下层Observer,且onNext调用顺序是自上而下。
3,observeOn多次调用,下层的Observer.onNext受最近的上层observeOn影响;subscribeOn多次调用,只受最上层的subscribeOn影响。
先分析observeOn方法的实现
//1,类似第一节中map等其他操作符,observeOn也会生成Observable子类ObservableObserveOn
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}
//2,ObservableObserveOn中,将Observer封装成ObserveOnObserver
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
//ObserveOnObserver实现了Runnable接口!
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//ObserveOnObserver的onNext方法,注意调用了schedule
@Override
public void onNext(T t) {
...
if (sourceMode != QueueDisposable.ASYNC) {
//将上层传入的数据T放入队列
queue.offer(t);
}
schedule();
}
//传入this,也就是实现了Runnable的ObserveOnObserver!简单地说,就是为Runnable找一个线程去执行
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
//3,ObserveOnObserver实现的run方法
@Override
public void run() {
...
//下层的Observer
final Observer<? super T> a = downstream;
//从队列中获取onNext传入的数据
v = q.poll();
//调用下层的Observer
a.onNext(v);
简单来说,observeOn会把Observer包装成一个实现了Runnable的Observer子类,然后在新线程中执行的run方法中,调用下层Observer的onNext。这里要注意哦!仅仅是将下层的onNext放入指定线程中执行!因为套娃的关系,如果下面不切换线程,那么所有下层的Observer.onNext都会运行在上层Observer.onNext所运行的线程中。
说明一下上图:
observeOn操作符会生成Observable子类ObservableObserveOn,ObservableObserveOn会将通过subscribe注册的Observer封装成ObserveOnObserver,执行onNext时还是位于ObservableObserveOn所在的线程,这时会将传入的数据放入一个队列,并且启动新线程。
ObserveOnObserver实现的run方法跑在新的线程,通过for死循环,从队列拿出数据,传给下层的Observer。这样,线程就一直运行着,只要不被回收,就不需要每次onNext都要从线程池取新线程。
再来分析subscribeOn方法的实现
observeOn影响的是下层的Observe,而observeOn影响的是上层的Observable。
我们来看observeOn操作符创建的Observable子类代码:
//实现了Runnable
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//在run中执行向上层订阅Observer的代码!
source.subscribe(parent);
}
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
//在这里传入了SubscribeTask,大概能猜到,就是在这将SubscribeTask放入了指定线程中执行!
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
再思考几个问题:
1,不考虑其他线程切换操作,使用subscribeOn将source.subscribe放到指定线程中,会影响哪些代码?
2,在subscribeOn和observerOn的同时影响下,observer.onNext和map中Function的apply方法会运行在哪个线程?
3,为什么subscribeOn只有最上层的操作有效?
解答如下:
1,影响所有(不止是上层)Observable封装的Observer的事件回调(onSubscribe、onNext等)和事件源ObservableOnSubscribe的subscribe方法!注意,我们自定义的Observer.onSubscribe不受影响,因为我们自定义的Observer位于最下层,但onNext一样会被影响。
因为Observer套娃子类是在Observable.subscribe中创建的,ObservableOnSubscribe.subscribe是由最上层ObservableCreate.subscribe调用的,都直接受subscribeOn影响。
2,运行在observerOn指定的线程中。因为observerOn影响的onNext执行顺序在subscribeOn影响的subscribe之后。
上图说明:
Observable实例都是在主线程创建的(假定Observable.create运行于主线程),当有subscribeOn的操作符调用,上面两层的Observable.subscribe就运行于指定的A线程,并在A线程创建Observer套娃实例并调用Observer.onSubscribe。而下面两层的Observer.onSubscribe运行在主线程。
observerOn操作符带来的影响是,observerOn下层的Observer.onNext运行在B线程,其他层Observer.onNext运行在subscribeOn指定的A线程。
3,这个说法不准确,其实下层的subscribeOn也是有效的。看起来无效,是因为夹在两个subscribeOn中间的Obsevable.subscribe本身并没有做什么能让使用者感知到线程切换的事情。真正让使用者感知到线程切换的onNext事件回调所在线程,是由onNext调用者:事件源ObservableOnSubscribe的subscribe方法所在线程决定的,而事件源的subscribe方法又由最上层ObservableCreate.subscribe方法中调用。于是,只有最上层的subscribeOn能影响ObservableCreate.subscribe所在线程,从而影响onNext事件回调的线程。
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Throwable {
Log.d(TAG,"apply1 in : " + Thread.currentThread().getName());
return s;
}
})
.subscribeOn(AndroidSchedulers.mainThread()) //主线程
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Throwable {
Log.d(TAG,"apply2 in : " + Thread.currentThread().getName());
return s;
}
})
.subscribeOn(Schedulers.newThread()) //新线程
打印是
apply1 in : main
apply2 in : main
多个subscribeOn的影响
网友评论