一、线程调度
public Observable<Integer> getRxJavaCreateExampleData() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 1);
emitter.onNext(1);
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 2);
emitter.onNext(2);
// Thread.sleep(5000);
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 3);
emitter.onNext(3);
emitter.onComplete();
LogUtils.debug(TAG, "getRxJavaCreateExampleData---:" + Thread.currentThread().getName() + "--:" + 4);
emitter.onNext(4);
}
});
}
public void rxJavaSchedulersExample() {
Disposable disposable = model.getRxJavaCreateExampleData()
.subscribeOn(Schedulers.io())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.error(TAG, "rxJavaSchedulersExample--:" + Thread.currentThread().getName() + "-doOnNext-:" + integer);
}
})
.observeOn(Schedulers.newThread())
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
LogUtils.error(TAG, "rxJavaSchedulersExample--:" + Thread.currentThread().getName() + "-filter-:" + integer);
return integer > 2;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.error(TAG, "rxJavaSchedulersExample--:" + Thread.currentThread().getName() + "-Consumer-:" + integer);
}
});
compositeDisposable.add(disposable);
}
日志
08-22 11:31:40.252 26676-26852/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:1
08-22 11:31:40.253 26676-26852/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:2
08-22 11:31:40.253 26676-26676/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:main-doOnNext-:1
08-22 11:31:40.254 26676-26852/com.example.zhang D/MainModel: getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:3
getRxJavaCreateExampleData---:RxCachedThreadScheduler-1--:4
08-22 11:31:40.254 26676-26676/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:main-doOnNext-:2
rxJavaSchedulersExample--:main-doOnNext-:3
08-22 11:31:40.255 26676-26853/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:RxNewThreadScheduler-2-filter-:1
rxJavaSchedulersExample--:RxNewThreadScheduler-2-filter-:2
rxJavaSchedulersExample--:RxNewThreadScheduler-2-filter-:3
08-22 11:31:40.255 26676-26676/com.example.zhang E/MainPresenter: rxJavaSchedulersExample--:main-Consumer-:3
总结
1、Schedules线程:① io②newThread③computation ④single⑤trampoline
2、AndroidSchedules线程:AndroidSchedulers.mainThread()
3、subscribeOn 控制的是上游被观察者 ,调用多次,只有第一次起作用;observeOn控制的是下游观察者,可以多次调用
备注
Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程
网友评论