美文网首页RxJava
RxJava<第五篇>:5种被观察者的创建

RxJava<第五篇>:5种被观察者的创建

作者: NoBugException | 来源:发表于2019-03-13 16:30 被阅读1次

    5种被观察者分别是:Observable,Flowable, Single, Completable, Maybe。
    五种被观察者可通过toObservable,toFlowable,toSingle,toCompletable,toMaybe相互转换。

    (1)Observable

    • Observable即被观察者,决定什么时候触发事件以及触发怎样的事件。
    • Oberver即观察者,他可以在不同的线程中执行任务,极大的简化了并发操作,因为他创建了一个处于待命状态的观察者,可以在某一时刻响应Observable的通知,而不会造成阻塞。
    • ObservableEmitter数据发射器,发射Observable的onNext,onError,onComplete,onSubscribe方法。
    • subscribe() 订阅Observable的四个方法,只有调用此方法才会开始发射数据。其有4个构造方法:
    subscribe(onNext()) 
    subscribe(onNext(),onError()) 
    subscribe(onNext(),onError(),onComplete())     
    subscribe(onNext(),onError(),onComplete(),onSubscribe())
    

    具体实现前几篇已经说明了,本篇就不介绍了。

    (2)Flowable

    可以看成是Observable的实现,只有Flowable支持压背

    • Observable:
      一般处理不超过1000条数据,几乎不会造成内存溢出
      不会背压
      处理同步流
    • Flowable:
      处理超过10KB的数据元素
      文件读取与分析
      读取数据库
      处理网络I/O流
      创建一个响应式的非阻塞接口

    压背的实现会在后续章节中讲解。

    (3)Single

    只有onSuccess和onError回调,Single只会发射一次数据
    具体实现如下:

        Single.create(new SingleOnSubscribe<CountBean>() {
            @Override
            public void subscribe(SingleEmitter<CountBean> e) throws Exception {
                if(!e.isDisposed()){
                    CountBean countBean = new CountBean();
                    countBean.setCount(0);
                    if(countBean.getCount() == 1){
                        e.onSuccess(countBean);
                    }else{
                        e.onError(new Throwable("nullpoint exception"));
                    }
                }
            }
        }).subscribe(new Consumer<CountBean>() {
            @Override
            public void accept(CountBean countBean) throws Exception {
                System.out.println("count:" + countBean.getCount());
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                System.out.println("exception:" + throwable.getMessage());
            }
        });
    
        Single.create(new SingleOnSubscribe<CountBean>() {
            @Override
            public void subscribe(SingleEmitter<CountBean> e) throws Exception {
                if(!e.isDisposed()){
                    CountBean countBean = new CountBean();
                    countBean.setCount(0);
                    if(countBean.getCount() == 1){
                        e.onSuccess(countBean);
                    }else{
                        e.onError(new Throwable("nullpoint exception"));
                    }
                }
            }
        }).subscribe(new SingleObserver<CountBean>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("被观察者和观察者开始连接!");
            }
    
            @Override
            public void onSuccess(CountBean countBean) {
                System.out.println("count:"+countBean.getCount());
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("exception:"+e.getMessage());
            }
        });
    

    (4)Completable

    只有onComplete和onError事件, 和Single不同, Completable不发射数据。

    具体实现如下:

        Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter e) throws Exception {
                if(!e.isDisposed()){
                    CountBean countBean = new CountBean();
                    countBean.setCount(0);
                    if(countBean.getCount() == 1){
                        e.onComplete();
                    }else{
                        e.onError(new Throwable("nullpoint exception"));
                    }
                }
            }
        }).subscribe(new Action() {
            @Override
            public void run() throws Exception {
    
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                
            }
        });
    
        Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter e) throws Exception {
                if(!e.isDisposed()){
                    CountBean countBean = new CountBean();
                    countBean.setCount(0);
                    if(countBean.getCount() == 1){
                        e.onComplete();
                    }else{
                        e.onError(new Throwable("nullpoint exception"));
                    }
                }
            }
        }).subscribe(new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {
                
            }
    
            @Override
            public void onComplete() {
    
            }
    
            @Override
            public void onError(Throwable e) {
    
            }
        });
    

    (5)Maybe

    没有onNext方法,同样需要onSuccess发射数据,且只能发射0或1个数据,多发也不再处理。

    具体实现如下:

        Maybe.create(new MaybeOnSubscribe<String>() {
    
            @Override
            public void subscribe(MaybeEmitter<String> e) throws Exception {
                if(!e.isDisposed()){
                    CountBean countBean = new CountBean();
                    countBean.setCount(1);
                    if(countBean.getCount() == 1){
                        e.onSuccess("aaaaa");
                    }else if(countBean.getCount() == 0){
                        e.onComplete();
                    }else{
                        e.onError(new Throwable("nullpoint exception"));
                    }
                }
            }
        }).subscribe(new MaybeObserver<String>() {
    
            @Override
            public void onSubscribe(Disposable d) {
            }
    
            @Override
            public void onSuccess(String s) {
                System.out.println("onSuccess:"+s);
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }
    
            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

    相关文章

      网友评论

        本文标题:RxJava<第五篇>:5种被观察者的创建

        本文链接:https://www.haomeiwen.com/subject/mrctmqtx.html