美文网首页
Rxjava(四)之合并操作符与异常处理操作符

Rxjava(四)之合并操作符与异常处理操作符

作者: 梦星夜雨 | 来源:发表于2021-01-26 10:35 被阅读0次

前言

上文我们介绍了过滤操作符和条件操作符。这里我们接着介绍合并操作符和异常处理操作符。

合并型操作符

合并操作符,是将两个或多个被观察者合并为一个被观察者,并向观察者传递事件。常见的合并操作符有startWith,concatWith,concat,merge,zip。

startWith操作符

        Observable.just(1, 2, 3)
        .startWith(Observable.just(10, 20, 30))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "accept: " + integer);
            }
        });
accept: 10
accept: 20
accept: 30
accept: 1
accept: 2
accept: 3

startWith可以将两个被观察者合并为一个被观察者,这里需要注意的是,startWith里面的被观察者是先发送的。
concatWith操作符
concatWith的功能和startWith相同,只是concatWith里的面被观察者是后发送的。

concat操作符

Observable.concat(Observable.just(1), Observable.just("23"), Observable.just(45))
            .subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object o) throws Exception {
                    Log.d(TAG, "accept: " + o);
                }
            });
 accept: 1
 accept: 23
 accept: 45

concat操作符是将至多四个被观察者合并为一个被观察者。

merge操作符

Observable observable1 = Observable.intervalRange(1, 3, 1,2, TimeUnit.SECONDS);
Observable observable2 = Observable.intervalRange(6, 3, 1,2, TimeUnit.SECONDS); // 6 7 8 9 10
Observable observable3 = Observable.intervalRange(11, 3, 1,2, TimeUnit.SECONDS); // 11 12 13 14 15
Observable.merge(observable1,observable2,observable3)
   .subscribe(new Consumer<Object>() {
   @Override
    public void accept(Object o) throws Exception {
        Log.d(TAG, "accept: " + o);
    }
});
accept: 1
accept: 6
accept: 11
accept: 2
accept: 7
accept: 12
accept: 3
accept: 8
accept: 13

同样merge操作符还是至多能让四个被观察者合并,这里我之所以用intervalRange的方式创建就是为了证明每个被合并的观察者都是并列执行的。

zip操作符

Observable.zip(Observable.just("语文","数学","英语"),Observable.just(90,99,82,95), new BiFunction<String, Integer, StringBuffer>() {
    @Override
    public StringBuffer apply(String s, Integer integer) throws Exception {
        return new StringBuffer().append("课程" + s).append("==").append(integer+"");
    }
})
.subscribe(new Consumer<Object>() {
    @Override
    public void accept(Object o) throws Exception {
        Log.d(TAG, "accept: " + o);
    }
});
 accept: 课程语文==90
 accept: 课程数学==99
 accept: 课程英语==82

zip操作符是将两个或多个被观察者发送的数据一一对应组合起来成为一个新的数据发送给观察者。
这里需要注意的是,根据代码和日志我们可以看到,被观察者二多了一个95的数据,但是日志并没有打印出来,这是因为该数据在被观察者一中不存在对应关系,所以就被舍弃了。

异常处理型操作符

异常处理操作符在Rxjava中的功能十分强大,可以对异常进行不同程度的处理和转换。常见的异常处理操作符有onErrorReturn,onErrorResumeNext,onExceptionResumeNext,retry。。

onErrorReturn操作符

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        for (int i = 0; i < 100; i++) {
            if (i == 5) {
                e.onError(new IllegalAccessError("我要报错了")); // 发射异常事件
            }
            e.onNext(i);
        }
        e.onComplete();
    }
})
.onErrorReturn(new Function<Throwable, Integer>() {
    @Override
    public Integer apply(Throwable throwable) throws Exception {
        Log.d(TAG, "onErrorReturn: " + throwable.getMessage());
        return 400; // 400代表有错误,给下一层,目前 下游 观察者
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "onNext: " + integer); // 400
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "onError: " + e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete: ");
    }
});
onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onErrorReturn: 我要报错了
onNext: 400
onComplete: 

onErrorReturn操作符,可以拦截到被观察者使用onError发送的异常,并且给观察者发送一个标识。

onErrorResumeNext操作符
与onErrorReturn操作符类似,不同的地方在于他返回的是一个被观察者,而我们可以在这个被观察者内多次发送事件。

onExceptionResumeNext操作符

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        for (int i = 0; i < 100; i++) {
            if (i == 5) {
//                e.onError(new IllegalAccessError("我要报错了")); // 发射异常事件
                throw new Exception("出错了");
            }
            e.onNext(i);
        }
        e.onComplete();
    }
})
.onExceptionResumeNext(new ObservableSource<Integer>() {
    @Override
    public void subscribe(Observer<? super Integer> observer) {
        observer.onNext(404); // 可以让程序 不崩溃的
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "onNext: " + integer); // 400
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "onError: " + e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete: ");
    }
});
onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 404

onExceptionResumeNext操作符非常强大,他同样可以处理onError事件,但它与上面两个操作符不同的是,它可以处理throw抛出的异常,使程序不崩溃。

retry操作符

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        for (int i = 0; i < 100; i++) {
            if (i == 3) {
                e.onError(new IllegalAccessError("我要报错了")); // 发射异常事件
            }
            e.onNext(i);
        }
        e.onComplete();
    }
})
//.retry(2, new Predicate<Throwable>() {
//    @Override
//    public boolean test(Throwable throwable) throws Exception {
//        return true;
//    }
//
.retry(new BiPredicate<Integer, Throwable>() {
    @Override
    public boolean test(Integer integer, Throwable throwable) throws Exception {
        if (integer>=3){
            return false;
        }
        return true;
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Integer integer) {
        Log.d(TAG, "onNext: " + integer); // 400
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "onError: " + e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete: ");
    }
});
onNext: 0
onNext: 1
onNext: 2
onNext: 0
onNext: 1
onNext: 2
onNext: 0
onNext: 1
onNext: 2
onError: 我要报错了

retry操作符是发生异常时的重试操作符,上面两种的结果都是一样的,第二种写法可以得到当前发送的次数。
这里需要注意的是,第一种写法数量是重试的数量,并没有计算第一次发送的。

至此,Rxjava中常见的操作符已经全部讲解完成。

相关文章

  • Rxjava(四)之合并操作符与异常处理操作符

    前言 上文我们介绍了过滤操作符和条件操作符。这里我们接着介绍合并操作符和异常处理操作符。 合并型操作符 合并操作符...

  • Rxjava---操作符篇---组合 / 合并操作符

    原文链接:Android RxJava操作符详解 系列:组合 / 合并操作符Android RxJava 实战系列...

  • zip操作符的error处理

    熟悉rxjava的同学肯定对操作符不会陌生,比如我们使用map操作符处理数据,使用zip操作符合并多个请求,这里演...

  • Rxjava2-二、操作符

    Rxjava记录总结操作符:创建操作符、转换操作符、合并操作符、过滤操作符、其他操作符、条件操作符. 创建操作符 ...

  • Combine - Operator(操作符)(二)

    线程 操作符|编码 操作符 |合并 操作符|错误处理 操作符|处理多个订阅者 操作符 线程 操作符 receive...

  • RxJava操作符系列四

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二RxJava操...

  • RxJava操作符系列五

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二RxJava操...

  • RxJava操作符系列六

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二RxJava操...

  • RxJava操作符系列三

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二 前言 在之前...

  • RxJava1源码分析

    我对于RxJava的异常处理和上抛方式有一些不解,而上网查找的文章都是RxJava的一些用于处理异常的操作符,所以...

网友评论

      本文标题:Rxjava(四)之合并操作符与异常处理操作符

      本文链接:https://www.haomeiwen.com/subject/moqxzktx.html