RxJava2.0最好的学习资料当然是官方资料
传送门:https://github.com/ReactiveX/RxJava
RxJava2.0基本类:
-
io.reactivex.Flowable
: 0..N flows, supporting Reactive-Streams and backpressure -
io.reactivex.Observable
: 0..N flows, no backpressure, -
io.reactivex.Single
: a flow of exactly 1 item or an error, -
io.reactivex.Completable
: a flow without items but only a completion or error signal, -
io.reactivex.Maybe
: a flow with no items, exactly one item or an error.
背压 Backpressure
官方文档:https://github.com/ReactiveX/RxJava/wiki/Backpressure
对于Backpressure的简单理解:背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
观察者模式
一、Observable/Observer
1、官方介绍:http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html
此观察者模式不支持背压
2、code
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("回手,掏~");
e.onNext("吆~鬼刀一开看不见~");
e.onNext("走位走位~");
e.onNext("手里干~");
e.onNext("哈哈哈哈哈哈哈~");
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(String string) {
Log.d("TAG", string + "\n");
}
@Override
public void onError(Throwable throwable) {
Log.d("TAG", "onError:" + throwable.getMessage());
}
@Override
public void onComplete() {
Log.d("TAG", "onComplete:");
}
});
二、Flowable/Subscriber
1、官方介绍:http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html
次观察者模式支持背压
Flowable可以通过create()来创建,但是必须指定背压的策略,以保证你创建的Flowable是支持背压的。
2、code
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> flowableEmitter) {
flowableEmitter.onNext("回手,掏~");
flowableEmitter.onNext("吆~鬼刀一开看不见~");
flowableEmitter.onNext("走位走位~");
flowableEmitter.onNext("手里干~");
flowableEmitter.onNext("哈哈哈哈哈哈哈~");
flowableEmitter.onComplete();
}
},BackpressureStrategy.BUFFER)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<String>() {
Subscription sub;
@Override
public void onSubscribe(Subscription s) {
Log.d("TAG","onsubscribe start");
sub=s;
sub.request(1);
Log.d("TAG","onsubscribe end");
}
@Override
public void onNext(String string) {
Log.d("TAG", string + "\n");
sub.request(1);
}
@Override
public void onError(Throwable throwable) {
Log.d("TAG", "onError:" + throwable.getMessage());
}
@Override
public void onComplete() {
Log.d("TAG", "onComplete:");
}
});
三、Maybe/MaybeObserver
1、官方介绍http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Maybe.html
Maybe/MaybeObserver是 Single、Completable两者的复合体,因此以Maybe/MaybeObserver为例简单介绍一下这种观察者模式的用法。
实际上,这种观察者模式并不用于发送大量数据,而是发送单个数据,也就是说,当你只想要某个事件的结果(true or false)的时候,你可以用这种观察者模式。
2、code
//判断是否登陆
Maybe.just(isLogin())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MaybeObserver<Boolean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Boolean value) {
if(value){
...
}else{
...
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
网友评论