5种被观察者分别是:Observable,Flowable, Single, Completable, Maybe。
五种被观察者可通过toObservable,toFlowable,toSingle,toCompletable,toMaybe相互转换。
(1)Observable
- Observable即被观察者,决定什么时候触发事件以及触发怎样的事件。
- Oberver即观察者,他可以在不同的线程中执行任务,极大的简化了并发操作,因为他创建了一个处于待命状态的观察者,可以在某一时刻响应Observable的通知,而不会造成阻塞。
- ObservableEmitter数据发射器,发射Observable的onNext,onError,onComplete,onSubscribe方法。
- subscribe() 订阅Observable的四个方法,只有调用此方法才会开始发射数据。其有4个构造方法:
subscribe(onNext())
subscribe(onNext(),onError())
subscribe(onNext(),onError(),onComplete())
subscribe(onNext(),onError(),onComplete(),onSubscribe())
具体实现前几篇已经说明了,本篇就不介绍了。
(2)Flowable
可以看成是Observable的实现,只有Flowable支持压背
- Observable:
一般处理不超过1000条数据,几乎不会造成内存溢出
不会背压
处理同步流
- Flowable:
处理超过10KB的数据元素
文件读取与分析
读取数据库
处理网络I/O流
创建一个响应式的非阻塞接口
压背的实现会在后续章节中讲解。
(3)Single
只有onSuccess和onError回调,Single只会发射一次数据
具体实现如下:
Single.create(new SingleOnSubscribe<CountBean>() {
@Override
public void subscribe(SingleEmitter<CountBean> e) throws Exception {
if(!e.isDisposed()){
CountBean countBean = new CountBean();
countBean.setCount(0);
if(countBean.getCount() == 1){
e.onSuccess(countBean);
}else{
e.onError(new Throwable("nullpoint exception"));
}
}
}
}).subscribe(new Consumer<CountBean>() {
@Override
public void accept(CountBean countBean) throws Exception {
System.out.println("count:" + countBean.getCount());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("exception:" + throwable.getMessage());
}
});
Single.create(new SingleOnSubscribe<CountBean>() {
@Override
public void subscribe(SingleEmitter<CountBean> e) throws Exception {
if(!e.isDisposed()){
CountBean countBean = new CountBean();
countBean.setCount(0);
if(countBean.getCount() == 1){
e.onSuccess(countBean);
}else{
e.onError(new Throwable("nullpoint exception"));
}
}
}
}).subscribe(new SingleObserver<CountBean>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("被观察者和观察者开始连接!");
}
@Override
public void onSuccess(CountBean countBean) {
System.out.println("count:"+countBean.getCount());
}
@Override
public void onError(Throwable e) {
System.out.println("exception:"+e.getMessage());
}
});
(4)Completable
只有onComplete和onError事件, 和Single不同, Completable不发射数据。
具体实现如下:
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
if(!e.isDisposed()){
CountBean countBean = new CountBean();
countBean.setCount(0);
if(countBean.getCount() == 1){
e.onComplete();
}else{
e.onError(new Throwable("nullpoint exception"));
}
}
}
}).subscribe(new Action() {
@Override
public void run() throws Exception {
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
if(!e.isDisposed()){
CountBean countBean = new CountBean();
countBean.setCount(0);
if(countBean.getCount() == 1){
e.onComplete();
}else{
e.onError(new Throwable("nullpoint exception"));
}
}
}
}).subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
}
@Override
public void onError(Throwable e) {
}
});
(5)Maybe
没有onNext方法,同样需要onSuccess发射数据,且只能发射0或1个数据,多发也不再处理。
具体实现如下:
Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(MaybeEmitter<String> e) throws Exception {
if(!e.isDisposed()){
CountBean countBean = new CountBean();
countBean.setCount(1);
if(countBean.getCount() == 1){
e.onSuccess("aaaaa");
}else if(countBean.getCount() == 0){
e.onComplete();
}else{
e.onError(new Throwable("nullpoint exception"));
}
}
}
}).subscribe(new MaybeObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
System.out.println("onSuccess:"+s);
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
网友评论