线程切换
只要使用RxJava肯定对下面的代码特别熟悉
Observable.from(list)
.subcribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1(){
public void call(Object obj){
}
});
其中subscribeOn()可以把事件发生的线程切换到io线程,observeOn()可以把处理事件的线程切换到Android应用程序主线程。
那他是怎么做到这么简洁的切换呢?
Rxjava的所有变换都基于一个lift模型,我们接下来介绍一下这个模型。
首先回顾一下Observable通知Subscriber的原理。
在生成Observable的时候我们会传入一个OnSubscribe的实例,在发生订阅关系subscribe
方法中,OnSubscribe实例的call(Subscriber)
方法中就会调用传入的Subscriber的的相关方法,从而实现通知,消息发送。
流程大概如下
Observable—(实例化时)—>OnSubscribe—(订阅关系发生subscribe时) —>Subscriber
那首先考虑一下我们怎么实现在Observable中发送的是 圆形
事件,但是在Subscriber中接收到 方形
事件并处理呢。
如果Observable和Subscriber的事件都不一样,这都不能发生订阅关系的,因为在编译检查的时候就无法通过。
RxJava采用的方法是提供一个变换方法lift(Operater)
,该方法返回一个Observable对象。该Observable对象会发送方形
事件,这样就可以用这个新的Observable对象来订阅原始的Subscriber了。
在新生成的Obaservable对象的时候,我们也会生成该Observable对应的OnSubscribe对象,并实现新的OnSubscribe对象的call(Subscriber)
方法。由于我们用新生成的Observable对象去订阅原始的Subscriber,所以新生成的OnSubscribe的call方法中的参数就是原始的Subscriber了。
接下来就是lift(Operator)
中Operator接口发挥作用的时候了,这个Operator接口的call方法实现把一个Subscriber变换成另一个Subscrber的功能。
在这里就是把原始的响应方形
事件的Subscriber转换成响应圆形
事件的Subscriber,这样就可以调用原始的OnSubscribe的call方法,把圆形
事件变成发送给这个新的响应圆形事件的Subscriber.
这样新的Observable中就同时包含了:
- 原始的OnSubscribe(它能发送
圆形
事件) - 新的Subscriber(它能接受
圆形
事件) - 原始的Subscriber(它能接受
方形
事件)
这样,在发生订阅关系时,原始的Observable(原始的OnSubscribe)会发送圆形
事件给新的Subscriber,新的Subscriber在处理的时候,就把这个事件转换一下传递给原始的Subscriber。
基本流程就是这样,Observable.Operator就是实现新老Subscriber关联的纽带。
借用扔物线的snippet
//这个lift的作用就是把只发送圆形事件的Observable转换成发送方形事件的Observable
public <Rectangle> Observable<Rectangle> lift(Operator<? extends Rectangle, ? super Round> operator) {
return Observable.create(new OnSubscribe<Rectangle>() {
@Override
//这个参数subscriber是原始的subscriber,只接受方形事件
public void call(Subscriber subscriber) {
//这个newSubscriber是operator生成的新的Subscriber,它可以接收圆形事件
//新的subscriber会调用原始的sunscriber的相关方法
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
//这个onSubscribe是原始的OnSubscribe,它发送圆形事件
//所以可以用新生成的newSubscriber来接收
onSubscribe.call(newSubscriber);
}
});
}
记不住以上内容也没有关系,只要记住在变换中,会成成一个新的Observable和Subscriber就可以了,我们在变换完之后所进行的操作都是针对新生成的Observable和新Subscriber。
经过变换之后,我们拥有两个Observable(OnSubscribe),也拥有两个Subscriber.
那我们想要切换事件处理的线程怎么办呢?我们可以在Operator中生成新的Subscriber的时候进行处理,在newSubscriber和原始subscriber进行映射的时候进行切换,所以可以知道observeOn()切换的是所有它下游的线程。
所以如果我们想要切换事件发生的线程,会怎么办呢?根据上面的代码,可以知道,只需要让最后的onSubscribe.call(newSubscriber)
运行在新线程就可以了。从这句代码也可以看出,subscribeOn()方法切换的是它的上游线程,这种线程切换一直会影响到最原始的observable。
但是如果在一条链式调用中出现了多个subscribeOn()方法,由于链式调用最上游的第一个subscribeOn方法会直接影响到最原始的observable,而在接下来的的链式调用中消息的发送是有newSubscriber来控制的,所以第二个subscribeOn方法不会影响线程的切换。
多说无益,直接上代码
Observable.just("hello","world","rxjava","rxandroid")
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//此处由于受到下面第一个subscribeOn的影响,输出RxComputationScheduler-1
Log.v("chicodong","thread 1 is: "+Thread.currentThread().getName());
return s.toUpperCase();
}
})
.subscribeOn(Schedulers.computation())
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
//此处由于上一个subscribeOn切换过线程,新生成的subscriber是在RxComputationScheduler-1发送事件的,所以仍然输出RxComputationScheduler-1
Log.v("chicodong","thread 2 is: "+Thread.currentThread().getName());
return s.length();
}
})
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
//此处由于上一个subscribeOn切换过线程,新生成的subscriber是在RxComputationScheduler-1发送事件的,所以仍然输出RxComputationScheduler-1
Log.v("chicodong","thread 3 is: "+Thread.currentThread().getName());
return integer+100;
}
})
//此处的第二个subscribeOn相当于没有起作用
.subscribeOn(Schedulers.io())
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
//此处由于上一个subscribeOn切换过线程,新生成的subscriber是在RxComputationScheduler-1发送事件的,所以仍然输出RxComputationScheduler-1
Log.v("chicodong","thread 4 is: "+Thread.currentThread().getName());
return integer.toString();
}
})
.observeOn(Schedulers.newThread())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//此处受到obServeOn的影响,输出RxNewThreadScheduler-1
Log.v("chicodong","thread 5 is: "+Thread.currentThread().getName());
return s+" lalala";
}
})
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
//此处受到第二个observeOn的影响,输出main
Log.v("chicodong","thread 6 is: "+Thread.currentThread().getName());
return s.toUpperCase();
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//此处受到第二个observeOn的影响,输出main
Log.v("chicodong","final result is "+s+" thread is "+Thread.currentThread().getName());
}
});
虽然多余一个subscribeOn对于线程切换没有影响,但是它可以在事件还没有发生时起作用,最常见的就是doOnSubscribe()方法了
Observable.from(list)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0(){
public void call(){
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidScheduler.mainThread())
.subscribe(new Action1(){
public void call(Object obj){
}
});
上面的代码中第二个subscribeOn对于线程切换没有影响,但是却可以使doOnSubscribe()运行在主线程中。
网友评论