美文网首页JAVA相关Android技术知识Android开发
RxJava入门(4):组合合并操作符

RxJava入门(4):组合合并操作符

作者: tmyzh | 来源:发表于2018-02-06 20:29 被阅读25次

    concat/concatArray

    组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
    区别:concat()组合被观察者数量<=4个,concatArray数量大于4个

     Observable.concat(Observable.just(1, 2, 3),
                    Observable.just(4, 5, 6),
                    Observable.just(7, 8, 9),
                    Observable.just(10, 11, 12))
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e("yzh","onSubscribe");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.e("yzh","onNext--"+integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("yzh","onError--"+e.toString());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("yzh","onComplete");
                        }
                    });
    

    打印结果


    concat
     Observable.concatArray(Observable.just(1, 2, 3),
                    Observable.just(4, 5, 6),
                    Observable.just(7, 8, 9),
                    Observable.just(10, 11, 12),
                    Observable.just(11,12,13))
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e("yzh","onSubscribe");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.e("yzh","onNext--"+integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("yzh","onError--"+e.toString());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("yzh","onComplete");
                        }
                    });
    

    打印结果


    concatArray

    merge()/mergeArray()

    组合多个被观察者一起发送数据,将同一时刻的事件合并然后发送,再顺序合并下面的事件
    区别与concat/concatArray一样

     Observable.merge(Observable.intervalRange(0,3,1,1, TimeUnit.SECONDS),
                    Observable.intervalRange(2,3,1,1,TimeUnit.SECONDS))
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e("yzh","onSubScribe");
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            Log.e("yzh","onNext--"+aLong);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("yzh","onError--"+e.toString());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("yzh","onComplete");
                        }
                    });
    

    打印结果


    merge

    concatDelayError() / mergeDelayError()

    当合并的被观察中有一个发出onError事件时,其他的被观察者的事件也会被阻止发送,使用上面这两个方法可以将onError事件推迟到其他被观察者发送事件结束后才触发

     Observable.concatArrayDelayError(Observable.create(new ObservableOnSubscribe<Integer>() {
               @Override
               public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                   e.onNext(1);
                   e.onNext(2);
    
                   e.onError(new NullPointerException());
               }
           }),Observable.just(1,2,3))
                   .subscribe(new Observer<Integer>() {
                       @Override
                       public void onSubscribe(Disposable d) {
    
                       }
    
                       @Override
                       public void onNext(Integer serializable) {
                           Log.e("yzh","onNext--"+serializable);
                       }
    
                       @Override
                       public void onError(Throwable e) {
                           Log.e("yzh","onError-"+e.toString());
                       }
    
                       @Override
                       public void onComplete() {
                            Log.e("yzh","onComplete");
                       }
                   });
    

    打印结果


    concatArrayDelayError

    如果直接使用concat结果如下

    onNext--1
    onNext--2
    onError--java.lang.NullPointException
    

    zip

    合并多个被观察者发送的事件,生成一个新的事件序列,然后发送

     Observable<Integer> observable1 =Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        Log.e("yzh","被观察者1发送事件1");
                        e.onNext(1);
                        Thread.sleep(1000);
                        Log.e("yzh","被观察者1发送事件2");
                        e.onNext(2);
                        Thread.sleep(1000);
    
    //                    e.onComplete();
                }
            }).subscribeOn(Schedulers.io());
            Observable<String> observable2 =Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    Log.e("yzh","观察者2发送事件1");
                    e.onNext("a");
                    Thread.sleep(1000);
                    Log.e("yzh","观察者2发送事件2");
                    e.onNext("b");
                    Thread.sleep(1000);
                    Log.e("yzh","被观察者2发送事件3");
                    e.onNext("c");
                    Thread.sleep(1000);
                    e.onComplete();
                }
            }).subscribeOn(Schedulers.newThread());
            Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
                @Override
                public String apply(Integer integer, String string) throws Exception {
                    Log.e("yzh","apply") ;
                    return  integer + string;
                }
            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("yzh", "onSubscribe");
                }
    
                @Override
                public void onNext(String value) {
                    Log.e("yzh", "onNext =  " + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("yzh", "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.e("yzh", "onComplete");
                }
            });
    

    打印结果

    zip
    注意 例子中的两个观察者用subscribeOn使用了不同的 线程,如果不加上这句代码,zip效果与concat一样,可以试一试。

    combineLatest()

    对两个被观察者中的事件组合再发送,特点是将第一个被观察者中最后一个事件分别与另一个被观察者中的事件组合再发送。

    Observable.combineLatest(Observable.just(1L, 2L, 3L),
                    Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), new BiFunction<Long, Long, Long>() {
                        @Override
                        public Long apply(Long integer, Long aLong) throws Exception {
                            Log.e("yzh","合并的对象--"+integer+"--"+aLong);
                            return integer+aLong;
                        }
                    }).subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("yzh","onSubscribe");
                }
    
                @Override
                public void onNext(Long s) {
                    Log.e("yzh","onNext--"+s);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("yzh","onError--"+e.toString());
                }
    
                @Override
                public void onComplete() {
                    Log.e("yzh","onComplete");
                }
            });
    

    打印结果


    combineLatest

    reduce()

    把被观察者需要发送的事件聚合成1个事件然后发送,有点斐波那契数列的意思

    Observable.just(1,2,3,4)
                    .reduce(new BiFunction<Integer, Integer, Integer>() {
                        @Override
                        public Integer apply(Integer integer, Integer integer2) throws Exception {
                            Log.e("yzh","操作数据--"+integer+"---"+integer2);
                            return integer*integer2;
                        }
                    }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e("yzh","接受到的数据--"+integer);
                }
            });
    

    打印结果


    reduce

    collect()

    将被观察者Observable发送的数据事件收集到一个数据结构里

     Observable.just(1,2,3,4,5)
                    .collect(new Callable<ArrayList<Integer>>() {
    
                        @Override
                        public ArrayList<Integer> call() throws Exception {
                            return new ArrayList<>();
                        }
                    }, new BiConsumer<ArrayList<Integer>, Integer>() {
                        @Override
                        public void accept(ArrayList<Integer> integers, Integer integer) throws Exception {
                                    integers.add(integer);
                        }
                    }).subscribe(new Consumer<ArrayList<Integer>>() {
                @Override
                public void accept(ArrayList<Integer> integers) throws Exception {
                        Log.e("yzh","accept--"+integers.toString());
                }
            });
    

    打印结果


    collect

    startWith() / startWithArray()

    在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者
    注意 后面的方法添加的数据在前面

     Observable.just(4,5,6)
                    .startWith(0)
                    .startWithArray(1,2,3)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e("yzh","onSubscribe");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.e("yzh","onNext--"+integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("yzh","onError--"+e.toString());
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("yzh","onComplete");
                        }
                    });
    

    打印结果


    startWith

    相关文章

      网友评论

        本文标题:RxJava入门(4):组合合并操作符

        本文链接:https://www.haomeiwen.com/subject/oafnzxtx.html