前言
最近用Retrofit+RxJava+Protocol Buffer对公司项目的网络请求进行了替换,但感觉RxJava的使用上还不够熟练,在学习了其他大神的入门教程后,为了以后能够有一个方便查找的地方,故有此文。
参考链接:
水管系列
什么是RxJava?
官网上提到了一个词:异步,即一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。可能对于已经理解并深刻掌握了的人来说,这句话总结的很好,但对不起,我对这一句话不是很理解,但我知道异步,Andorid里有一个AsyncTask和Handler,他们都实现了线程间的切换,也都将异步简洁化,而RxJava作为一个实现了异步的库有什么优势呢:它与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。(这里的简简洁代表的是理解起来简单,代码量并没有减少,反而可能会增多)。
RxJava的简单使用
首先引入RxJava
dependencies {
//RxJava
implementation "io.reactivex.rxjava2:rxjava:2.2.9"
}
编写代码
//被观察者(上游)
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//观察者(下游)
Observer observer=new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("observer", "onSubscribe" );
}
@Override
public void onNext(Integer integer) {
Log.e("observer", integer+"" );
}
@Override
public void onError(Throwable e) {
Log.e("observer", e.getMessage() );
}
@Override
public void onComplete() {
Log.e("observer", "onComplete" );
}
};
//订阅
observable.subscribe(observer);
打印结果如下:
06-11 11:45:35.234 30502-30502/com.zhqy.myrxjavademo E/observer: onSubscribe
06-11 11:45:35.234 30502-30502/com.zhqy.myrxjavademo E/observer: 1
06-11 11:45:35.234 30502-30502/com.zhqy.myrxjavademo E/observer: 2
06-11 11:45:35.234 30502-30502/com.zhqy.myrxjavademo E/observer: 3
06-11 11:45:35.234 30502-30502/com.zhqy.myrxjavademo E/observer: onComplete
上下游通过subscribe()方法建立起联系后,下游首先调用onSubscribe()方法来建立上下游的关系,随后上游发送1,2,3这三个事件,下游通过onNext方法接收者三个事件后,上游发送onComplete通知下游事件发送完毕,下游的onComplete被调用。这就是上面代码所实现的功能,上游发送事件,下游接收上游发送的事件。
再来介绍一下几个类ObservableEmitter和Disposable ,ObservableEmitter俗称“发射器”,上游的所有事件均有它进行发送,但是事件也不能随便发送,需要满足一定的规则:
1.上游可以发送无限个事件,下游也可以接收无数个事件。
2.上游发送error或者complete事件后,下游将不再接受上游的后续事件,并调用onComplete或者onError方法,但上游的后续事件仍可以继续发送。
3.上游可以不发送onComplete和onError
4.onComplete和onError互斥且唯一
Disposable就相当于一个开关,能够随时切断上下游的联系,使得下游接收不到上游发送的消息,但是需要注意的是Disposable切断上下游的联系不会影响到上游事件的发送,也就是说上游该怎么发送事件还是怎么发送事件。示例如下:
private Disposable disposable;
//被观察者
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.e("onNext",1+"" );
emitter.onNext(1);
Log.e("onNext",1+"" );
emitter.onNext(2);
Log.e("onNext",3+"" );
emitter.onNext(3);
emitter.onComplete();
}
});
//观察者
Observer observer=new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
disposable=d;
Log.e("observer", "onSubscribe" );
}
@Override
public void onNext(Integer integer) {
Log.e("observer", integer+"" );
if (integer==2){
//切断上下游联系
disposable.dispose();
}
}
@Override
public void onError(Throwable e) {
Log.e("observer", e.getMessage() );
}
@Override
public void onComplete() {
Log.e("observer", "onComplete" );
}
};
//订阅
observable.subscribe(observer);
}
结果如下:
06-11 13:52:41.202 19317-19317/? E/observer: onSubscribe
06-11 13:52:41.202 19317-19317/? E/onNext: 1
06-11 13:52:41.202 19317-19317/? E/observer: 1
06-11 13:52:41.202 19317-19317/? E/onNext: 1
06-11 13:52:41.202 19317-19317/? E/observer: 2
06-11 13:52:41.202 19317-19317/? E/onNext: 3
从结果可知,当切断上下游联系后,下游将不再接收上游发送的事件,但上游仍会继续发送事件。
网友评论