前言
RxJava的学习笔记(一)基础概念
RxJava的学习笔记(二)操作符
RxJava的学习笔记(三)线程调度
上一节笔记二中记录了RxJava2常用操作符的使用,在本节中继续学习RxJava2中最强大、最牛逼的地方,那就是可以在各个事件产生和传递的过程中能够自由的切换线程。
要在Android中使用RxAndroid, 先继续添加Gradle配置:
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
1、Scheduler
Scheduler顾名思义被称为线程调度器。有了这个东西能够让RxJava中可以自由的在主线程和子线程之间切换,在RxJava中内置了以下Scheduler线程调度器:
调度器类型 | 用途 |
---|---|
Schedulers.computation() | 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
Schedulers.from(executor) | 使用指定的Executor线程池作为自定义的调度器 |
Schedulers.single() | RxJava2新增调度器,返回一个默认的,共享的,单线程支持的Scheduler实例,用于在同一个后台线程上强制执行的工作。 |
Schedulers.immediate( ) | RxJava1.0里面有该调度器,到2.0之后已被移除。在当前线程立即开始执行任务 |
Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,类似一个有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个任务创建一个新线程 |
Schedulers.trampoline( ) | 当其它排队的任务完成后,在当前线程排队开始执行 |
AndroidSchedulers.mainThread() | 此调度器为RxAndroid特有,顾名思义,运行在Android UI线程上 |
2、Scheduler线程调度的实践
在RxJava中可以利用subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。对于subscribeOn和observeOn方法的使用特点和对应的作用分下面几种情况:
- 1、不进行线程Scheduler切换控制时都是在当前主线程里面运行。
示例:
public static void noScheduler() {
Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
Observable.just("data")
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
return s + "-map0";
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
}
@Override
public void onNext(String value) {
Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
}
@Override
public void onComplete() {
Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
}
});
}
得到的结果:
08-15 18:30:04.854 10161-10161/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[main,5,main]
08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[main,5,main]
08-15 18:30:04.863 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[main,5,main]
- 2、subscribeOn无论放置在哪个位置,它的线程切换发生在该Observable的OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响。当使用多个subscribeOn() 的时候,只有第一个 subscribeOn() 起作用。
示例:
public static void subscribeOnScheduler() {
Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
Observable.just("data")
.subscribeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
return s + "-map0";
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
}
@Override
public void onNext(String value) {
Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
}
@Override
public void onComplete() {
Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
}
});
}
得到的结果是:
08-15 18:33:31.654 10161-10161/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 18:33:31.671 10161-10161/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 18:33:31.695 10161-10499/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-1,5,main]
08-15 18:33:31.695 10161-10499/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[RxCachedThreadScheduler-1,5,main]
08-15 18:33:31.695 10161-10499/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxCachedThreadScheduler-1,5,main]
只有第一个subscribeOn()生效,而最后的subscribeOn就没有效果了。
- 3、通过observeOn进行线程切换时,发生在自己内部构建的Observable被订阅者中,也就是在它即将给下一级的订阅者发送事件时,因此,ObserveOn()控制的是后面的线程,可以多个ObserveOn()一起使用。每使用一次ObserveOn(),它后面的线程就跟着变换一次。
示例:
public static void observeOnScheduler() {
Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
Observable.just("data")
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
return s + "-map0";
}
})
.observeOn(Schedulers.computation())
.observeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
}
@Override
public void onNext(String value) {
Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
}
@Override
public void onComplete() {
Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
}
});
}
得到的结果是:
08-15 10:52:51.998 2498-2498/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 10:52:52.003 2498-2498/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 10:52:52.004 2498-2577/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-1,5,main]
08-15 10:52:52.018 2498-2579/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[RxNewThreadScheduler-1,5,main]
08-15 10:52:52.018 2498-2579/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxNewThreadScheduler-1,5,main]
- 4、下游的onSubscrible的回调是在subscrible()订阅动作的时候就被调用了,因此不能先直接指定onSubscrible执行的线程,而只能在subscribe()被调用时的线程中。
- 5、对于设置doOnSubscribe回调默认跟onSubscribe的回调规则那样在执行subscribe()订阅动作的时候被调用了,但是如果在 doOnSubscribe() 之后有 subscribeOn() ,它将执行在离它最近的 subscribeOn() 所指定的线程。
示例:
public static void subscribeAndobserveOnScheduler() {
Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
Observable.just("data")
.observeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(XqTag.TAG, "doOnSubscribe.thread:" + Thread.currentThread());
}
})
//doOnSubscribe后面设置了subscribeOn后执行离它最近的subscribeOn所指定的线程
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
return Observable.just(s + "-map0");
}
})
.observeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
}
@Override
public void onNext(String value) {
Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
}
@Override
public void onComplete() {
Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
}
});
}
得到结果是:
08-15 10:58:41.261 2498-2498/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 10:58:41.267 2498-2498/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 10:58:41.268 2498-2606/cn.jltx.rxjava.rx D/XQTAG: doOnSubscribe.thread:Thread[RxNewThreadScheduler-2,5,main]
08-15 10:58:41.268 2498-2605/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-2,5,main]
08-15 10:58:41.270 2498-2607/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0.thread:Thread[RxComputationThreadPool-2,5,main]
08-15 10:58:41.270 2498-2607/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxComputationThreadPool-2,5,main]
- 6、subscribeOn和observeOn嵌套以及多个Observable联合使用时,一层一层的线程由父Observable到子Observable的切换,同时也可以在doOnSubscribe()、doOnNext()、doOnComplete()这些方法前后自由通过observeOn()切换这些方法回调的线程。
示例:
public static void subscribeAndobserveOnNestScheduler() {
Log.d(XqTag.TAG, "start.thread:" + Thread.currentThread());
Observable.just("data")
.observeOn(Schedulers.io())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(XqTag.TAG, "doOnSubscribe.thread:" + Thread.currentThread());
}
})
//doOnSubscribe后面设置了subscribeOn后执行离它最近的subscribeOn所指定的线程
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(XqTag.TAG, "map0.thread:" + Thread.currentThread());
return s + "-map0";
}
})
.observeOn(Schedulers.newThread())
.flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
Log.d(XqTag.TAG, "flatMap1.thread:" + Thread.currentThread());
Observable<String> flatMapObservable = Observable.just(s + "-flatMap1")
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.d(XqTag.TAG, "flatMapObservable.doOnSubscribe.thread:" + Thread.currentThread());
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
Log.d(XqTag.TAG, "apply.map2.thread:" + Thread.currentThread());
return s + "-map2";
}
})
.observeOn(Schedulers.single())
.doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(XqTag.TAG, "doOnNext.accept" + s + ".map2.thread:" + Thread.currentThread());
}
})
.observeOn(Schedulers.newThread())
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
Log.d(XqTag.TAG, "doOnComplete.accept" + s + ".map2.thread:" + Thread.currentThread());
}
});
return flatMapObservable;
}
})
.observeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(XqTag.TAG, "onSubscribe.thread:" + Thread.currentThread());
}
@Override
public void onNext(String value) {
Log.d(XqTag.TAG, "onNext:" + value + ".thread:" + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.d(XqTag.TAG, "onError:" + e.toString() + ".thread:" + Thread.currentThread());
}
@Override
public void onComplete() {
Log.d(XqTag.TAG, "onComplete.thread:" + Thread.currentThread());
}
});
}
得到的结果是:
08-15 11:08:09.277 2498-2498/cn.jltx.rxjava.rx D/XQTAG: start.thread:Thread[main,5,main]
08-15 11:08:09.277 2498-2498/cn.jltx.rxjava.rx D/XQTAG: onSubscribe.thread:Thread[main,5,main]
08-15 11:08:09.279 2498-2646/cn.jltx.rxjava.rx D/XQTAG: doOnSubscribe.thread:Thread[RxNewThreadScheduler-3,5,main]
08-15 11:08:09.279 2498-2645/cn.jltx.rxjava.rx D/XQTAG: map0.thread:Thread[RxCachedThreadScheduler-3,5,main]
08-15 11:08:09.280 2498-2647/cn.jltx.rxjava.rx D/XQTAG: flatMap1.thread:Thread[RxNewThreadScheduler-4,5,main]
08-15 11:08:09.282 2498-2498/cn.jltx.rxjava.rx D/XQTAG: flatMapObservable.doOnSubscribe.thread:Thread[main,5,main]
08-15 11:08:09.283 2498-2645/cn.jltx.rxjava.rx D/XQTAG: apply.map2.thread:Thread[RxCachedThreadScheduler-3,5,main]
08-15 11:08:09.284 2498-2648/cn.jltx.rxjava.rx D/XQTAG: doOnNext.acceptdata-map0-flatMap1-map2.map2.thread:Thread[RxSingleScheduler-1,5,main]
08-15 11:08:09.287 2498-2649/cn.jltx.rxjava.rx D/XQTAG: doOnComplete.acceptdata-map0.map2.thread:Thread[RxNewThreadScheduler-5,5,main]
08-15 11:08:09.299 2498-2650/cn.jltx.rxjava.rx D/XQTAG: onNext:data-map0-flatMap1-map2.thread:Thread[RxComputationThreadPool-3,5,main]
08-15 11:08:09.299 2498-2650/cn.jltx.rxjava.rx D/XQTAG: onComplete.thread:Thread[RxComputationThreadPool-3,5,main]
写在最后
每天坚持写点日志心得。
最后附上源码:
https://git.oschina.net/jltx/RxJavaRetrofitDemoPro
网友评论