美文网首页
RxJava2.0---创建被观察者

RxJava2.0---创建被观察者

作者: 玉树林枫 | 来源:发表于2017-06-25 12:22 被阅读0次

    RxJava2.0的简单使用
    RxJava2.0---创建被观察者
    基于RxJava的事件总线RxBus

    在RxJava中,我们可以创建出各种各样的被观察者(数据源),也对应各种各样的创建方法,下面就一一来解释一下。能力有限,如有错误请指出,谢谢。

    Create

    使用Create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable--恰当的调用观察者的onNext,onError和onCompleted方法。这里只讲非背压的,背压的也是类型创建方法,也可以点击这里进行查看

     /**
     * 非背压
     * Observable对应Observer
     */
    private void createObservable() {
        //被观察者
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("This");
                e.onNext("is");
                e.onNext("RxJava");
                e.onComplete();
                e.onNext("Oh");//就算观察者接收onComplete,
                // 被观察者还是会继续发送数据,只是观察者不接收
                Log.i(TAG, "subscribe: ");
            }
        });
        //观察者
        Observer<String> observer = new Observer<String>() {
            Disposable disposable;
    
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
            }
    
            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext: " + s);
            }
    
            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: " + e.getLocalizedMessage());
            }
    
            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
                //取消订阅
                if (!disposable.isDisposed()) {
                    disposable.dispose();
                }
            }
        };
        observable.subscribe(observer);
    }
    

    just、fromArray、fromIterable

    可以将其它种类的对象和数据类型转换为Observable

    /**
     * 三种遍历数组集合的Observable
     */
    private void justFromObservable() {
        Log.i(TAG, "justFrom: ");
        //just
        Observable<Integer> observable = Observable.just(1, 2, 3);
    //        //fromArray
    //        observable = Observable.fromArray(1, 2, 3);
    //
    //        ArrayList<Integer> list = new ArrayList<>();
    //        list.add(1);
    //        //fromIterable
    //        observable = Observable.fromIterable(list);
        observable.subscribe(new Observer<Integer>() {
            Disposable disposable;
    
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
            }
    
            @Override
            public void onNext(Integer integer) {
                Log.i(TAG, "onNext: " + integer);
            }
    
            @Override
            public void onError(Throwable e) {
                Log.i(TAG, e.getLocalizedMessage());
            }
    
            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
                if (!disposable.isDisposed())
                    disposable.dispose();
            }
        });
    }
    

    defer

    直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable,例如下面创建deferObservable时传入的num=10的(并非真正创建,订阅时才真正创建),然后在订阅前把num=20,订阅后收到的数据是20,并非10。

     int num = 10;
    
    /**
     * 延迟创建使用defer
     */
    private void deferObservable() {
        Observable<Integer> observable = Observable.just(num);
        //使用num=10来创建deferObservable 
        Observable<Integer> deferObservable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> call() throws Exception {
                return Observable.just(num);
            }
        });
        num = 20;
        deferObservable.subscribe(new Observer<Integer>() {
            Disposable disposable;
    
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
            }
    
            @Override
            public void onNext(Integer integer) {
                //输出20
                Log.i(TAG, "deferObserve onNext: " + integer);
            }
    
            @Override
            public void onError(Throwable e) {
                Log.i(TAG, e.getLocalizedMessage());
            }
    
            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
                if (!disposable.isDisposed())
                    disposable.dispose();
            }
        });
        observable.subscribe(new Observer<Integer>() {
            Disposable disposable;
    
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
            }
    
            @Override
            public void onNext(Integer integer) {
                //输出10
                Log.i(TAG, "observable onNext: " + integer);
            }
    
            @Override
            public void onError(Throwable e) {
                Log.i(TAG, e.getLocalizedMessage());
            }
    
            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
                if (!disposable.isDisposed())
                    disposable.dispose();
            }
        });
    }
    

    empty、Never、Throw

    1.创建一个不发射任何数据但是正常终止的Observable
    2.创建一个不发射数据也不终止的Observable
    3.创建一个不发射数据以一个错误终止的Observable
    这三个操作符生成的Observable行为非常特殊和受限。测试的时候很有用,有时候也用于结合其它的Observables,或者作为其它需要Observable的操作符的参数。

    private void emptyNeverThrow() {
        Observable.empty().subscribe();
        Observable.never().subscribe();
        Observable.error(new NullPointerException()).subscribe();
    }
    

    interval

    创建一个按固定时间间隔发射整数序列的Observable

    private void timerObservable() {
        Observable.interval(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                Log.i(TAG, Thread.currentThread().getName() + "  accept: " + aLong);
            }
        });
    }
    

    range(1, 10)

    创建一个发射特定整数序列的Observable(这里是1到10)

     private void rangeObservable() {
        Observable.range(1, 10).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.i(TAG, Thread.currentThread().getName() + "  accept: " + integer);
            }
        });
    }
    

    timer

    在一个给定的延迟后发射一个特殊的值

    private void timerObservable() {
        Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                Log.i(TAG, Thread.currentThread().getName() + "  accept: " + aLong);
            }
        });
    }
    

    repeat、repeatWhen

    创建一个发射特定数据重复多次的Observable,但是repeatWhen它不是缓存和重放原始Observable的数据序列,而是有条件的重新订阅和发射原来的Observable。repeatWhen操作符默认在trampoline调度器上执行。

    private void repeatObservable() {
        Observable<String> observable = Observable.just("AAA", "BBB", "CCC");
    //        observable.repeat(5).subscribe(new Consumer<String>() {
    //            @Override
    //            public void accept(@NonNull String s) throws Exception {
    //                Log.i(TAG, Thread.currentThread().getName() + "   accept: " + s);
    //            }
    //        });
        observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
                return objectObservable;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.i(TAG, Thread.currentThread().getName() + "   accept: " + s);
            }
        });
    }

    相关文章

      网友评论

          本文标题:RxJava2.0---创建被观察者

          本文链接:https://www.haomeiwen.com/subject/zwnlcxtx.html