前言
上一篇文章简单介绍了RxJava的使用,本文将在前文的基础上针对一些细节进行说明。
链式编程
链式编程不是什么新鲜的东西,很多框架中的构造者模式都使用了链式编程,RxJava自然也不例外。代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("onSubscribe", "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.e("onNext", integer+"");
}
@Override
public void onError(Throwable e) {
Log.e("onError",e.getMessage());
}
@Override
public void onComplete() {
Log.e("onComplete","onComplete");
}
});
结果如下:
06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onSubscribe: onSubscribe
06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onNext: 1
06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onNext: 2
06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onNext: 3
06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onComplete: onComplete
这里需要说明的是subscribe()有多个重载方法
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
第一个重载:上游发送的事件下游都不接收
第二个重载:只关心上游onNext()方法发送的事件
第三个重载:只关心上游onNext()和onError()发送的事件
第四个重载:关心上游onNext(),onError(),onComplete()发送的事件
第五个重载:和例子中使用的Observer类似,只是需要分别创建对象来接收各自的事件
线程控制
RxJava作为一个异步库上下游自然能够在进程间进行切换,但需要注意的是线程间的切换也需要遵守一些规则:
默认情况下上下游在同一个进程中,所处进程有所在的进程决定,举例来说,如果你把上下游写在Activity中,那么上下游均处于主线程中。代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e("thread", "上游处在"+Thread.currentThread().getName() );
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("thread","下游处在"+ Thread.currentThread().getName() );
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
结果如下:
06-11 14:26:04.576 23290-23290/? E/thread: 下游处在main
06-11 14:26:04.576 23290-23290/? E/thread: 上游处在main
结果说明了上下游处在同一线程内。但很多情况下上下游处于不同的线程下,比如说网络请求时请求数据需要在子进程下,而更新UI则需要在UI线程下,这又该如何做呢,其实很简单,只需要设置如下代码
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
其中subscribeOn指定上游所在线程,observeOn指定下游所在进程,结果如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e("thread", "上游处在"+Thread.currentThread().getName() );
}
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("thread","下游处在"+ Thread.currentThread().getName() );
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
注意:如果想在Android下使用RxJava需要添加如下依赖
//RxAndroid
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
结果如下
06-11 14:34:12.541 23802-23802/com.zhqy.myrxjavademo E/thread: 下游处在main
06-11 14:34:12.543 23802-23830/com.zhqy.myrxjavademo E/thread: 上游处在RxNewThreadScheduler-1
从结果可以看出上下游处于不同的线程中,从而完成不同的功能。
网友评论