RxJava中的线程
默认的情况下,Observable 和 Observer是处在同一线程的,发送事件在哪个线程,处理事件同样也在该线程。
在Activity的onCreate方法中运行以下代码:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.d("RxJava", "Observable Thread: " + Thread.currentThread().getName());
e.onNext(1);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
}
});
可以得到以下结果:
D/RxJava: Observable Thread: main
D/RxJava: Observer Thread: main
RxJava中,使用Scheduler来进行线程控制,从而实现了关键的异步操作。
Scheduler
Scheduler可以称之为线程调度器,它指定了发送和处理事件所在的线程。常用的API有以下几个:
- Schedulers.newThread():启用新线程并在新线程运行
- Schedulers.io():进行I/O 操作所使用的Scheduler,它的内部实现是一个无上限的线程池,可以重用空闲线程,比newThread更有效率,通常用于读写文件,数据库,网络操作等。
- Schedulers.computation():CPU密集计算所用的Scheduler,它内部是一个线程数等于CPU核心数的线程池。
- AndroidSchedulers.mainThread(): Android中的主线程(UI线程)。
介绍完了常用API之后,通过下面的例子来看一下是怎样使用的:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.d("RxJava", "Observable Thread: " + Thread.currentThread().getName());
e.onNext(1);
}
})
.subscribeOn(Schedulers.newThread())//指定observable线程
.observeOn(AndroidSchedulers.mainThread())//指定Observer线程
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
}
});
还是上面的例子,加了两行代码,subscribeOn和observeOn。subscribeOn用来指定发送事件的线程,即事件产生的线程,observeOn指定接收并处理事件的线程,即事件消费线程。运行结果如下:
D/RxJava: Observable Thread: RxNewThreadScheduler-1
D/RxJava: Observer Thread: main
subscribeOn和observeOn都可以多次设置,但是subscribeOn只有第一次设置的值会生效,而observeOn不一样,观察者会按照observeOn的指定顺序依次切换到最后一个线程。
操作符
操作符的作用是在事件发送的过程中完成一些特定的操作,比如对事件的包装,添加额外的动作等等。常用操作符主要有以下几种:
-
map();
map的作用是将observable的数据进行加工,转换成一个新的数据之后再进行发送。看一个具体的例子:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "This is Data No. " + integer ;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String str) throws Exception {
Log.d("RxJava", "Received data: " + str);
}
});
输出结果如下:
D/RxJava: Received data: This is Data No. 1
D/RxJava: Received data: This is Data No. 2
D/RxJava: Received data: This is Data No. 3
-
FlatMap()
FlatMap与map类似,但是功能更强大了。map只是对Observable发送的数据进行处理,返回的是处理后的数据,而FlatMap在数据处理之后返回的是一个Observable对象,所以,FlatMap实际上是对原来的一系列事件进行加工然后分拆,将每一个数据包含在一个新的Observable对象中发送给下游的观察者。这样做有什么好处? 举一个简单的例子,如果每一个事件都是耗时操作,那么采用FlatMap,将事件分发给不同的Observable,然后加入Schedulers.io(),这样效率瞬间提高了。示例如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
//原始事件,打印线程
Log.d("RxJava", "Original Observable Thread: " + Thread.currentThread().getName());
e.onNext(10);
e.onNext(20);
e.onNext(30);
}
})
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull final Integer integer) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>(){
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
//打印FlatMap转换后,发送事件的线程
Log.d("RxJava", "Observable Thread: " + Thread.currentThread().getName());
Thread.sleep(1000);
e.onNext("This is Data No." + integer);
}
//指定flatMap转换后发送事件所处的线程
}).subscribeOn(Schedulers.io());
}
})
//指定原始事件发送线程
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String str) throws Exception {
//打印观察者所处的线程
Log.d("RxJava", "Observer Thread: " + Thread.currentThread().getName());
Log.d("RxJava", "Received data: " + str);
}
});
运行结果如下:
Original Observable Thread: RxCachedThreadScheduler-1
D/RxJava: Observable Thread: RxCachedThreadScheduler-1
D/RxJava: Observable Thread: RxCachedThreadScheduler-2
D/RxJava: Observable Thread: RxCachedThreadScheduler-3
Observer Thread: main
D/RxJava: Received data: This is Data No.10
D/RxJava: Observer Thread: main
D/RxJava: Received data: This is Data No.20
D/RxJava: Observer Thread: main
D/RxJava: Received data: This is Data No.30
从结果可以看出来,最初的Observable包含3个事件,运行在同一个子线程中,如果是耗时操作,采用同步的方式会浪费大量事件,经过FlatMap转换之后,将每个事件转换为一个新的Observable对象,并指定线程,效率一下提高了3倍!
-
concatMap
这个操作符与FlatMap作用一样,只是,FlatMap转换的事件在发送时并不保证顺序,而concatMap仍然会按原来的顺序发送。 -
filter()
filter用来对发送的数据进行过滤
.filter(new Predicate<Integer>() {
@Override
public boolean test(@NonNull Integer integer) throws Exception {
return false;
}
})
返回值决定了下游观察者是否能够收到数据,true表示能收到,false表示不能接收到。
-
take()
传入一个long数值,表示取前多少个数据。如果传入值大于数据量,会全部发送。另外,它还接收时间参数,表示在多长时间内发送的数据会被接收。 -
doOnNext()
doOnNext()允许我们在每次输出一个元素之前做一些额外的事情,比如缓存,调试,等等。
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d("RxJava", "onnext2 Observer Thread: " + Thread.currentThread().getName());
}
})
网友评论