美文网首页
7. Rxjava2 : 合并操作符

7. Rxjava2 : 合并操作符

作者: 0青衣小褂0 | 来源:发表于2019-02-13 10:56 被阅读76次

    1. RxJava2 : 什么是观察者模式
    2. RxJava2 : 创建操作符(无关时间)
    3. Rxjava2 : 创建操作符(有关时间)
    4. Rxjava2 : 变换操作符
    5. Rxjava2 : 判断操作符
    6. Rxjava2 : 筛选操作符
    7. Rxjava2 : 合并操作符
    8. Rxjava2 : do操作符
    9. Rxjava2 : error处理
    10. Rxjava2 : 重试
    11. Rxjava2 : 线程切换

    api use
    concat / concatArray {{concat}}
    concatDelayError {{concatDelayError}}
    merge / mergeArray {{merge}}
    mergeDelayError {{mergeDelayError}}
    zip {{zip}}
    reduce {{reduce}}
    collect {collect}
    startWith / startWithArray {{startWith}}

    concat / concatArray

    • concat
      1.concat的参数不得超过4个(不能超过4个Observable)
      2.concat是按照顺序合并的
    • concatArray
      concatArray如果参数超过4个(就将Observable组成集合传入),剩下的均与concat相同

    流程

    3个Observable一共有9个元素
    Observable0(元素1,元素2,元素3)
    Observable1(元素4,元素5,元素6)
    Observable2(元素7,元素8,元素9)
    ->
    按照顺序合并元素
    (元素1,元素2,元素3,元素4,元素5,元素6,元素7,元素8,元素9)
    ->
    subscribe
    ->
    observer 观察到9个元素
    
            Observable<Integer> observable0 = Observable.just(1, 2, 3);
            Observable<Integer> observable1 = Observable.just(4, 5, 6);
            Observable<String> observable2 = Observable.just("7", "8", "9");
            Observable.concat(observable0, observable1, observable2)
                    .subscribe(new Observer<Serializable>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Serializable serializable) {
                            if (serializable instanceof Integer) {
                                Log.d(TAG, "Integer:" + serializable);
                            } else if (serializable instanceof String) {
                                Log.d(TAG, "String:" + serializable);
                            }
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:1
    02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:2
    02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:3
    02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:4
    02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:5
    02-12 17:42:57.603 6230-6230/... D/SplashActivity: Integer:6
    02-12 17:42:57.603 6230-6230/... D/SplashActivity: String:7
    02-12 17:42:57.603 6230-6230/... D/SplashActivity: String:8
    02-12 17:42:57.603 6230-6230/... D/SplashActivity: String:9
    02-12 17:42:57.603 6230-6230/... D/SplashActivity: onComplete
    

    concatDelayError

    • concatDelayError
      1.将onError放在了最后执行,那么onError就无法作为结束标识了,只能依靠onComplete作为结束标识
      2.能否执行onError取决于能否找到onComplete标识
      3.采用concat的方式,即按照元素顺序合并,Observable0的complete失去作用,能否执行onError就取决于Observable1是否有结束标识
      4.以onError结束,则不会执行onComplete

    流程

    Observable
    ->
    Observable0(元素1,元素2,元素3,throwable,complete)
    Observable1(元素4,元素5,complete)
    ->
    元素1,元素2,元素3,元素4,元素5,complete
    ->
    元素1,元素2,元素3,元素4,元素5,throwable
    ->
    subscribe
    -> 
    observer
    ->
    log throwable
    
    //Observable1具有e.onComplete()结束标识
     Observable<Integer> observable = Observable.create(e -> {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new NullPointerException());
                e.onComplete();
            });
            Observable<Integer> observable2 = Observable.create(e -> {
                e.onNext(4);
                e.onNext(5);
                e.onComplete();
            });
            List<Observable<Integer>> objects = new ArrayList<>();
            objects.add(observable);
            objects.add(observable2);
            Observable.concatDelayError(objects)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-12 17:57:01.083 7281-7281/... D/SplashActivity: integer:1
    02-12 17:57:01.083 7281-7281/... D/SplashActivity: integer:2
    02-12 17:57:01.083 7281-7281/... D/SplashActivity: integer:3
    02-12 17:57:01.093 7281-7281/... D/SplashActivity: integer:4
    02-12 17:57:01.093 7281-7281/... D/SplashActivity: integer:5
    02-12 17:57:01.093 7281-7281/... D/SplashActivity: onError
    
    //Observable1不具有e.onComplete()结束标识
    Observable<Integer> observable = Observable.create(e -> {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new NullPointerException());
                e.onComplete();
            });
            Observable<Integer> observable2 = Observable.create(e -> {
                e.onNext(4);
                e.onNext(5);
            });
            List<Observable<Integer>> objects = new ArrayList<>();
            objects.add(observable);
            objects.add(observable2);
            Observable.concatDelayError(objects)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:1
    02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:2
    02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:3
    02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:4
    02-12 18:03:27.353 8228-8228/... D/SplashActivity: integer:5
    

    merge

    • merge
      1.merge的参数不得超过4个(不能超过4个Observable)
      2.merge是按照时间合并的
    • mergeArray
      mergeArray如果参数超过4个(就将Observable组成集合传入),剩下的均与concat相同

    流程

    时间(秒) 0 1 2 3 4 5 6 7 8
    Observable0 没发送 没发送 1 没发送 没发送 2 没发送 没发送 3
    Observable1 没发送 4 没发送 没发送 5 没发送 没发送 6 没发送
    Observable2 没发送 7 没发送 没发送 8 没发送 没发送 9 没发送

    依照时间来合并:
    则是 4,7,1,5,8,2,6,9,3

    Log.d(TAG, "in");
            Observable<Long> longObservable = Observable
                    .intervalRange(1, 3, 2, 2, TimeUnit.SECONDS);
            Observable<Long> longObservable1 = Observable
                    .intervalRange(4, 3, 1, 2, TimeUnit.SECONDS);
            Observable<Long> longObservable2 = Observable
                    .intervalRange(7, 3, 1, 2, TimeUnit.SECONDS);
            Observable
                    .merge(longObservable, longObservable1,longObservable2)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            Log.d(TAG, "aLong:" + aLong);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-13 09:33:04.121 13228-13228/... D/SplashActivity: in
    02-13 09:33:05.121 13228-13271/... D/SplashActivity: aLong:4
    02-13 09:33:05.121 13228-13272/... D/SplashActivity: aLong:7
    02-13 09:33:06.131 13228-13270/... D/SplashActivity: aLong:1
    02-13 09:33:07.131 13228-13271/... D/SplashActivity: aLong:5
    02-13 09:33:07.131 13228-13272/... D/SplashActivity: aLong:8
    02-13 09:33:08.131 13228-13270/... D/SplashActivity: aLong:2
    02-13 09:33:09.131 13228-13271/... D/SplashActivity: aLong:6
    02-13 09:33:09.131 13228-13272/... D/SplashActivity: aLong:9
    02-13 09:33:10.121 13228-13270/... D/SplashActivity: aLong:3
    02-13 09:33:10.121 13228-13270/... D/SplashActivity: onComplete
    

    mergeDelayError

    • mergeDelayError
      与concatDelayError类似,onError会在最后接收到
      区别在于:
      concatDelayError:
      会以最后一个onComplete作为结束的标识
      mergeDelayError:
      在时间过程当中,哪个Observable出现了error,哪个Observable就不再继续发送了,直至所有Observable的元素都结束之后,处理onError

    流程

    时间 0 1 2
    Observable0 1 throwable 2
    Observable1 3 4 5,complete
    合并 1,3 throwable,4 error结束,5,complete

    则最后的结果为: 1,3,4,5,onError;

    注意:在示例中,我采取了指定不同的线程做延时,如果在相同线程中,会出现卡线程的情况,会影响到结果
    
    Log.d(TAG, "in");
            Observable<Long> longObservable = Observable.create((ObservableOnSubscribe<Long>) e -> {
                e.onNext(1L);
                Thread.sleep(1000);
                e.onError(new NullPointerException());
                Thread.sleep(1000);
                e.onNext(2L);
                e.onComplete();
            }).subscribeOn(Schedulers.newThread());
            Observable<Long> longObservable1 = Observable.create((ObservableOnSubscribe<Long>) e -> {
                e.onNext(3L);
                Thread.sleep(1000);
                e.onNext(4L);
                Thread.sleep(1000);
                e.onNext(5L);
                e.onComplete();
            }).subscribeOn(Schedulers.newThread());
            Observable
                    .mergeDelayError(longObservable, longObservable1)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            Log.d(TAG, "aLong:" + aLong);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-13 10:03:26.701 16418-16418/... D/SplashActivity: in
    02-13 10:03:26.711 16418-16463/... D/SplashActivity: aLong:1
    02-13 10:03:26.711 16418-16464/... D/SplashActivity: aLong:3
    02-13 10:03:27.721 16418-16464/... D/SplashActivity: aLong:4
    02-13 10:03:28.711 16418-16464/... D/SplashActivity: aLong:5
    02-13 10:03:28.711 16418-16464/... D/SplashActivity: onError
    

    zip

    • zip
      必须严格遵守一一对应的关系

    流程

    observable 元素 元素 元素 元素
    observable0 1 2 3
    observable1 4 5 onComplete 6
    zip合并(相加) 5 7 onComplete 碰到onComplete标识,已经结束

    最终的结果 5,7

    Observable<Integer> observable = Observable.create(e -> {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            });
            Observable<Integer> observable1 = Observable.create(e -> {
                e.onNext(4);
                e.onNext(5);
                e.onComplete();
                e.onNext(6);
            });
            Observable
                    .zip(observable, observable1,
                            (integer, integer2) -> integer + integer2)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-01 11:25:53.581 11233-11233/... D/ShellAppApplication: integer:5
    02-01 11:25:53.581 11233-11233/... D/ShellAppApplication: integer:7
    
    Observable<Integer> observable = Observable.create(e -> {
                e.onError(new NullPointerException());
                e.onNext(2);
                e.onNext(3);
            });
            Observable<Integer> observable1 = Observable.create(e -> {
                e.onNext(4);
                e.onNext(5);
                e.onComplete();
                e.onNext(6);
            });
            Observable
                    .zip(observable, observable1,
                            (integer, integer2) ->  integer + integer2)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-13 10:34:54.131 17980-17980/... D/SplashActivity: onError
    

    reduce

    • reduce
      对于同一个Observable当中的所有元素逐一处理,合并为同一个元素

    流程

    observable(元素1,元素2,元素3)
    ->
    reduce
    ->
    apply (元素1,元素2) -> 元素X
    apply(元素X,元素3) -> (元素X +元素3) 作为新的元素X 
    递归,直至最后一个
    
    -> 
    one observable
    ->
    subscribe
    
    Disposable subscribe = Observable.just(1, 2, 3).reduce((integer, integer2) -> {
                Log.d(TAG, "first:" + integer);
                Log.d(TAG, "second:" + integer2);
                return integer + integer2;
            }).subscribe(integer -> Log.d(TAG, "integer:" + integer));
    

    log

    02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: first:1
    02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: second:2
    02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: first:3
    02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: second:3
    02-01 15:41:32.261 4709-4709/... D/ShellAppApplication: integer:6
    

    collect

    • collect
      收集
      1.指定一个容器
      2.将所有的元素收集到这个容器当中
    Disposable subscribe = Observable.just(1, 2, 3)
                    .collect(new Callable<List<Integer>>() {
                        @Override
                        public List<Integer> call() throws Exception {
                            return new ArrayList<>();
                        }
                    }, new BiConsumer<List<Integer>, Integer>() {
                        @Override
                        public void accept(List<Integer> list, Integer integer) throws Exception {
                            list.add(integer);
                        }
                    })
                    .subscribe(new Consumer<List<Integer>>() {
                        @Override
                        public void accept(List<Integer> integers) throws Exception {
                            Log.d(TAG, "integers:" + integers);
                        }
                    });
    

    log

    02-13 10:49:49.061 18664-18664/... D/SplashActivity: integers:[1, 2, 3]
    

    startWith

    • startWith
      在最开始的时候插入
      1.支持插入单一元素
      2.支持插入Observable
    Observable.just(1, 2, 3)
                    .startWith(0)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-13 10:53:59.151 18992-18992/... D/SplashActivity: integer:0
    02-13 10:53:59.151 18992-18992/... D/SplashActivity: integer:1
    02-13 10:53:59.151 18992-18992/... D/SplashActivity: integer:2
    02-13 10:53:59.151 18992-18992/... D/SplashActivity: integer:3
    02-13 10:53:59.151 18992-18992/... D/SplashActivity: onComplete
    
    Observable.just(1, 2, 3)
                    .startWith(Observable.just(4, 5, 6))
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "integer:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    log

    02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:4
    02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:5
    02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:6
    02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:1
    02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:2
    02-13 10:55:14.241 19238-19238/... D/SplashActivity: integer:3
    02-13 10:55:14.241 19238-19238/... D/SplashActivity: onComplete
    

    相关文章

      网友评论

          本文标题:7. Rxjava2 : 合并操作符

          本文链接:https://www.haomeiwen.com/subject/kdikeqtx.html