美文网首页
Android Rxjava框架的原理和使用

Android Rxjava框架的原理和使用

作者: 像程序那样去思考 | 来源:发表于2023-03-25 22:10 被阅读0次

    原理

    Rx是Reactive Extensions的缩写的简写,可以使用可观察数据流对编程接口进行异步编程,它结合了观察者模式,迭代器模式和函数式的精华。

    Rxjava是一种异步数据处理库,也是一种观察者模式。最早是Netflix公司用于重构当前架构时减少REST调用的次数,参考了Microsoft公司的响应式编程,把Microsoft的Rx库迁移到Java JVM中,其中最有名的就是RxJava。

    它的特点主要有以下:

    1. 支持Java 8 Lambda。
    2. 支持异步和同步。
    3. 单一依赖关系。
    4. 简洁,优雅。

    RxAndroid

    在开发项目的时候,开发者在使用Rxjava时会搭配RxAndroid,他是针对Rxjava在Android平台使用的一个响应式扩展组件。使用RxAndroid的Schedulers(调度器)可以解决Android主线程问题, 多线程等问题。

    观察者模式的四大要素

    1. Observable 被观察者
    2. Observer
    3. 观察者 subscribe 订阅
    4. 事件
    image.png

    观察者订阅被观察者,一旦被观察者发出事件,观察者就可以接收到。

    扩展的观察者模式

    image.png

    onNext()订阅了一个事件,当事件完成时会回调onComplete(),在完成过程中发生了异常会回调onError()。

    使用

    依赖

    //在Project的gradle下添加maven仓库
    maven { url "https://oss.jfrog.org/libs-snapshot" }
    
    implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
    implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
    

    Hello World

    //1.创建被观察者
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("hello world");
                    emitter.onComplete();
                }
            });
    //2.创建观察者
    Observer<String> observer = new Observer<String>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    System.out.println("onSubscribe():");
                }
    
                @Override
                public void onNext(@NonNull String s) {
                    System.out.println("onNext():" + s);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    System.out.println("onError():" + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete():");
                }
            };
    //3.订阅事件
    observable.subscribe(observer);
    

    注意:onError()和onComplete()只会回调一个。

    操作符

    Creating Observables(创建 Observable)

    Create

    //链式写法
            Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("hello world");
                    emitter.onComplete();
                }
            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("onSubscribe():"+d.toString());
                }
    
                @Override
                public void onNext(String o) {
                    System.out.println("onNext():" + o);
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("onError():" + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete():");
                }
            });
    

    Just
    使用将为你创建一个Observable并自动为你调用onNext( )发射数据,just中传递的参数将直接在Observer的onNext()方法中接收到。

    Observable.just("hello world").subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println(s);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    From

    将一个Iterable, 一个Future, 或者一个数组转换成一个Observable,遍历集合,发送每个item。相当于多次回调onNext()方法,每次传入一个item。

    List<String> list = new ArrayList<>();
            for (int i = 0; i < 10; i++) {
                list.add("Hello" + i);
            }
    
            Observable.fromArray(list).subscribe(new Observer<List<String>>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(List<String> strings) {
                    System.out.println(strings);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Defer

    当观察者订阅时,才创建Observable,并且针对每个观察者创建都是一个新的Observable。 以何种方式创建这个Observable对象,当满足回调条件后,就会进行相应的回调。

    value = "2020/12/13";
        Observable<String> observable = Observable.defer(new Supplier<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> get() throws Throwable {
                return Observable.just(value);
            }
        });
        value = "12345";
        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
    
            }
    
            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
    
            @Override
            public void onError(Throwable e) {
    
            }
    
            @Override
            public void onComplete() {
    
            }
        });
    

    Empty/Never/Throw

    Empty是创建一个不发射任何数据但是正常终止的Observable。 Never是创建一个不发射数据也不终止的Observable。 Throw是创建一个不发射数据以一个错误终止的Observable。 这三个操作符生成的Observable行为非常特殊和受限。测试的时候很有用,有时候也用于结合其它的Observables,或者作为其它需要Observable的操作符的参数。

    Observable.defer(new Supplier<ObservableSource<?>>() {
                @Override
                public ObservableSource<?> get() throws Throwable {
                    return Observable.error(new Throwable("你写了个bug"));
                }
            }).subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
    
                }
    
                @Override
                public void onNext(@NonNull Object o) {
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    System.out.println(e.getMessage());
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Interval

    创建一个按固定时间间隔发射整数序列的Observable,可用作定时器。即按照固定1秒一次调用onNext()方法。

    //TrampolineScheduler不会立即执行,当其他排队任务结束时才执行,TrampolineScheduler运行在主线程。
    
    Observable.interval(1000, TimeUnit.MILLISECONDS, Schedulers.trampoline()).subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                }
    
                @Override
                public void onNext(@NonNull Long aLong) {
                    System.out.println(aLong);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Repeat

    创建一个Observable,该Observable的事件可以重复调用。

     Observable.just(123).repeat(2).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.println(integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Start

    返回一个Observable,它发射一个类似于函数声明的值。

    Timer

    创建一个Observable,它在一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。

     Observable.timer(2000, TimeUnit.MILLISECONDS,Schedulers.trampoline()).subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Long aLong) {
                    System.out.println(aLong);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Transforming Observables(转换 Observable)

    Map

    Map就是把原来的Observable对象转换成另一个Observable对象,同时将传输的数据进行一些灵活的操作,方便Observer获得想要的数据形式。

    //Integer to String
    Observable.just(123).map(new Function<Integer, String>() {
                @Override
                public String apply(Integer s) throws Exception {
                    return s.toString();
                }
            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(String s) {
                    System.out.println(s);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    flatMap对于数据的转换比map()更加彻底,如果发送的数据是集合,flatmap()重新生成一个Observable对象,并把数据转换成Observer想要的数据形式。它可以返回任何它想返回的Observable对象。

     Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<? extends String>>() {
                @Override
                public ObservableSource<? extends String> apply(Integer integer) throws Exception {
                    return Observable.just(integer.toString());
                }
            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(String o) {
                    System.out.println(o);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    GroupBy

    根据规则对数据进行分组。

    Observable.just(1, 2, 3, 4, 5).groupBy(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Exception {
                    return integer % 2==0?"偶数":"奇数";
                }
            }).subscribe(new Observer<GroupedObservable<String, Integer>>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
    
                }
    
                @Override
                public void onNext(@NonNull final GroupedObservable<String, Integer> arg0) {
                    arg0.subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            System.out.println(arg0.getKey() + "-------" + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Buffer

    定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。

    Observable.just(1, 2, 3, 4, 5,6).buffer(3).subscribe(new Observer<List<Integer>>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(List<Integer> integers) {
                    System.out.println(integers);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Scan

    将数据进行累加。

    Observable.range(1, 5).scan(new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.println(integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Window

    window和buffer相似,它返回的是一个Observable对象,它根据一系列任务规则把数据聚集到一个列表。

    //        window第一个参数count:每个窗口应发射前的最大大小;第二个:在启动新窗口之前需要跳过多少项
            Observable.range(1, 5).window(5, 1).subscribe(new Observer<Observable<Integer>>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(final Observable<Integer> arg0) {
                    System.out.println(arg0);
                    arg0.subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            System.out.println("---"+integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Filtering Observables(过滤 Observable)

    Debounce

    操作间隔一定时间内没有做任何操作,数据才会发送到观察者。

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> arg0) throws Exception {
                    for (int i = 0; i < 10; i++) {
                        Thread.sleep(2000);
                        arg0.onNext(i);
                    }
                    arg0.onComplete();
                }
            }).debounce(1, TimeUnit.SECONDS).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer o) {
                    System.out.println(o);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
                }
            });
    

    Distinct

    去掉重复数据的操作符。

    Observable.just(1, 2, 3, 2, 3).distinct().subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.println(integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    ElementAt

    取出指定位置的数据。

    Observable.just(1, 2, 3, 2, 3).distinct().subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.println(integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Filter

    对数据进行指定规则的过滤。

    Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() {
                @Override
                public boolean test(Integer integer) throws Exception {
    
                    return integer > 2;
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.println(integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    First

    取数据中的第一个数据。

    //first参数:defaultItem: 当前Observable不发射任何内容时发出的默认项
            Observable.just(1, 2, 3,4,5).first(10).subscribe(new SingleObserver<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onSuccess(Integer integer) {
                    System.out.println(integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
            });
    

    IgnoreElements

    忽略所有的数据,不向观察者发送数据,直接回调onError或onComplete()。

     Observable.just(6, 9, 1, 3).distinct().ignoreElements().subscribe(new CompletableObserver() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
            });
    

    Last

    列表数据最后指定的数位项数据。 SingleObserver只发射一条单一的数据,或者一条异常通知,不能发射完成通知,其中数据与通知只能发射一个。

         Observable.just(6, 4, 2,4).distinct().map(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Exception {
                    return integer.toString();
                }
            }).last("4").subscribe(new SingleObserver<String>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onSuccess(String s) {
                    System.out.println(s);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
            });
     Observable.just(6, 4, 2,4).distinct().map(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Exception {
                    return integer.toString();
                }
            }).last("4").subscribe(new SingleObserver<String>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onSuccess(String s) {
                    System.out.println(s);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
            });
    

    Sample

    对数据源进行样本采集,发送给观察者。

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> arg0) throws Exception {
                    for (int i = 0; i < 10; i++) {
                        Thread.sleep(1000);
                        arg0.onNext(i);
                    }
                    arg0.onComplete();
                }
            }).sample(4, TimeUnit.SECONDS).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
    
                    System.out.println(integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Skip

    跳过指定列表项数据的指定项数据。

    Observable.just(6, 3, 2, 1).skip(2).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.println(integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    SkipLast

    跳过列表数据的最后几位数据。

    Observable.just(1, 2, 3, 5).skipLast(2).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.println(integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Take

    只取列表数据的前几项。

    Observable.just(1, 2, 3, 4).take(2).takeLast(1).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.println(integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    TakeLast

    取列表数据项的最后几项数据。 Consumer是简易版的Observer,他有多重重载,可以自定义你需要处理的信息,他只提供一个回调接口accept,由于没有onError和onCompete,无法再 接受到onError或者onCompete之后,实现函数回调。

    Observable.just(1, 2, 3, 4).takeLast(2).take(1).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println(integer);
                }
            });
    Combining Observables(组合 Observable)
    

    Zip

    通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。当其中一个Observable发送数据结束或异常,另外一个也停止发送。

     Observable<Integer> observable = Observable.just(10, 20, 30);
            Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
            Observable.zip(observable, observable1, new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
    
                }
    
                @Override
                public void onNext(@NonNull Integer integer) {
                    System.out.println(integer);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Merge

    合并多个Observables的发射物。

    Observable<Integer> observable = Observable.just(10, 20, 30);
            Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
            Observable.merge(observable, observable1).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
    
                }
    
                @Override
                public void onNext(@NonNull Integer integer) {
                    System.out.println(integer);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    StartWith

    在数据序列的开头插入一条指定的项。

    Observable<Integer> observable = Observable.just(10, 20, 30);
            Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
            Disposable subscribe = observable.startWith(observable1).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println(integer);
                }
            });
    

    CombineLatest

    当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。

    Observable<Integer> observable = Observable.just(1, 3, 5);
            Observable<Integer> observable1 = Observable.just(2, 4, 6);
            Observable.combineLatest(observable, observable1, new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(Integer integer, Integer integer2) throws Throwable {
                    System.out.println("integer:" + integer + "---" + "integer2:" + integer2);
                    return integer + integer2;
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
    
                }
    
                @Override
                public void onNext(@NonNull Integer integer) {
                    System.out.println(integer);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Join

    任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。

    String[] args1 = new String[]{"张欣1", "张欣2", "张欣3", "张欣4", "张欣5"};
        String[] args2 = new String[]{"春晓1", "春晓2", "春晓3", "春晓4"};
        Observable<String> o1 = Observable.fromArray(args1);
        Observable<String> o2 = Observable.fromArray(args2);
        //相同的数组可以进行合并
        o2.join(o1, new Function<String, Observable<Long>>() {
            @Override
            public Observable<Long> apply(String s) throws Exception {
                return Observable.timer(2, TimeUnit.SECONDS);
            }
        }, new Function<String, Observable<Long>>() {
            @Override
            public Observable<Long> apply(String s) {
                return Observable.timer(2, TimeUnit.SECONDS);
            }
        }, new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) throws Exception {
                return s + "-&--" + s2;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
    
            }
    
            @Override
            public void onNext(@NonNull String s) {
                System.out.println(s);
            }
    
            @Override
            public void onError(@NonNull Throwable e) {
    
            }
    
            @Override
            public void onComplete() {
    
            }
        });
    

    SwitchOnNext

    将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。

            final Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
            final Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS);
            Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {
                @Override
                public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
                    emitter.onNext(observable1);
                    Thread.sleep(1000);
                    // 此时发射一个新的observable2,将会取消订阅observable1
                    emitter.onNext(observable2);
                    emitter.onComplete();
                }
            });
    
            // 创建发射含有Error通知的Observable序列的Observable
            Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {
    
                @Override
                public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
                    emitter.onNext(observable1);
    //                emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 发射一个发射Error通知的Observable
    //                emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 发射一个发射Error通知的Observable
                    Thread.sleep(1000);
                    // 此时发射一个新的observable2,将会取消订阅observable1
                    emitter.onNext(observable2);
                    emitter.onComplete();
                }
            });
    
            // 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
            // 可选参数 bufferSize: 缓存数据项大小
            // 接受一个发射Observable序列的Observable类型的sources,
            // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据
            Disposable subscribe = Observable.switchOnNext(sources)
                    .subscribe(new Consumer<Long>() {
    
                        @Override
                        public void accept(Long integer) throws Exception {
                            System.out.println("--> accept(1): " + integer);
                        }
                    });
            System.out.println("--------------------------------------------------------------------");
            // 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
            // 可选参数 prefetch: 与读取数据项大小
            // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据,
            // 保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者
            Observable.switchOnNextDelayError(sourcesError)
                    .subscribe(new Observer<Long>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            System.out.println("--> onSubscribe(2)");
                        }
    
                        @Override
                        public void onNext(Long t) {
                            System.out.println("--> onNext(2): " + t);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            // 判断是否是CompositeException对象(发生多个Observable出现Error时会发送的对象)
                            if (e instanceof CompositeException) {
                                CompositeException compositeException = (CompositeException) e;
                                List<Throwable> exceptions = compositeException.getExceptions();
                                System.out.println("--> onError(2): " + exceptions);
                            } else {
                                System.out.println("--> onError(2): " + e);
                            }
                        }
    
                        @Override
                        public void onComplete() {
                            System.out.println("--> onComplete(2)");
                        }
                    });
    

    Error Handling Operators(处理错误)

    Catch

    Catch操作符拦截原始Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。 还有一个叫onErrorResumeNext的操作符,它的行为与Catch相似。 RxJava将Catch实现为三个不同的操作符:

    • onErrorReturn 让Observable遇到错误时发射一个特殊的项并且正常终止。
    • onErrorResumeNext 让Observable在遇到错误时开始发射第二个Observable的数据序。
    • onExceptionResumeNext 让Observable在遇到错误时继续发射后面的数据项。
    Observable.just(1,2,3).onErrorReturn(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable throwable) throws Throwable {
                    return  null;
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
    
                }
    
                @Override
                public void onNext(@NonNull Integer integer) {
                    System.out.println(integer);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    System.out.println(e.getMessage());
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Retry

    如果原始Observable遇到错误,重新订阅它期望它能正常终止。 retryWhen和retry类似,区别是,retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,retryWhen观察它的结果再决定是不是要重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。 retryWhen默认在trampoline调度器上执行,你可以通过参数指定其它的调度器。

    场景:网络请求失败重试操作。

    final AtomicInteger atomicInteger = new AtomicInteger(3);
            Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                    emitter.onNext(String.valueOf(System.currentTimeMillis()));
                    emitter.onError(new Error(String.valueOf(atomicInteger.decrementAndGet())));
                }
            }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Throwable {
                    return throwableObservable;
                }
            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
    
                }
    
                @Override
                public void onNext(@NonNull String s) {
                    System.out.println(s);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    System.out.println(e.getMessage());
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    更多相关文章

    Android如何进阶:http://docs.qq.com/doc/DWHFqVHBMVEJPWUx
    Android面试题汇总:http://docs.qq.com/doc/DWGZIRFh5VEtYWE1D
    Android音视频需要学习哪些:http://docs.qq.com/doc/DWFFWZHNPTHZVdHFX
    Android常有的开源框架有哪些框:docs.qq.com/doc/DWHlGYUdseVhsSUda
    Android车载应需要学习哪些:docs.qq.com/doc/DWEl0blBabXVvU2Nw Android
    Framework怎么学:docs.qq.com/doc/DWFdlc2JocEtNbEJ1

    Schedulers(调度器)

    它是RxJava以一种及其简单的方式解决多线程问题的机制。

    种类

    io() 用于I/O操作。 computation() 计算,计算工作默认的调度器,与I/O操作无关。 immediate() 立即执行,允许立即在当前线程执行你指定的工作。 newThread() 新线程,为指定任务创建新线程。 trampoline() 顺序处理,按需处理队列,并运行队列的每一个任务。

    AndroidSchedulers

    RxAndroid提供在Android平台的调度器(指定观察者在主线程)。

    • SubscribeOn 方法用于每个Observable对象
    • ObserveOn 方法用于每个Subscriber(Observer)对象
     observable.subscribeOn(Schedulers.newThread())
                        .unsubscribeOn(Schedulers.newThread())
                        .observeOn(AndroidSchedulers.mainThread())
                        .onErrorResumeNext(new HttpErrorHandler<T>())
                        .subscribe(observer);
    

    使用场景

    与Retrofit结合使用

    retrofitBuilder = new Retrofit.Builder();
            retrofitBuilder.client(okHttpClient)
                    .addConverterFactory(ScalarsConverterFactory.create())
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create());
                    
    public <T> void doCall(LifecycleOwner owner, Observable<T> observable, final HttpCallBack<T> httpCallBack) {
    
            if (observable == null || httpCallBack == null) {
                throw new IllegalArgumentException("observable或HttpCallBack为空");
            }
    
            //观察者_网络请求状态
            BaseObserver<T> observer = new BaseObserver<T>() {
                @Override
                public void onNext(T t) {
                    try {
                        if (t != null) {
                            httpCallBack.onSuccess(t);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        httpCallBack.onFailure(e);
                    }
                }
    
                @Override
                public void onError(Throwable e) {
                    httpCallBack.onFailure(e);
                }
    
            };
    
            if (owner == null) {
                //被观察者订阅观察者,根据生命周期取消订阅,子线程订阅主线程观察
                observable.subscribeOn(Schedulers.newThread())
                        .unsubscribeOn(Schedulers.newThread())
                        .observeOn(AndroidSchedulers.mainThread())
                        .map(getAppErrorHandler())
                        .onErrorResumeNext(new HttpErrorHandler<T>())
                        .subscribe(observer);
            } else {
                //被观察者订阅观察者,根据生命周期取消订阅,子线程订阅主线程观察
                observable.subscribeOn(Schedulers.newThread())
                        .unsubscribeOn(Schedulers.newThread())
                        .observeOn(AndroidSchedulers.mainThread())
                        .map(getAppErrorHandler())
                        .onErrorResumeNext(new HttpErrorHandler<T>())
                        .as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(owner, Lifecycle.Event.ON_DESTROY)))
                        .subscribe(observer);
            }
        }
    

    与RxPermission结合使用

    RxPermission是基于RxJava的Android动态权限申请框架。

    使用(简单封装)

      public void initPermissions(String[] permissions, PermissionResult permissionResult) {
            if (rxPermissions == null) {
                rxPermissions = new RxPermissions(this);
            }
            rxPermissions.requestEachCombined(permissions)
                    .subscribe(permission -> {
                        if (permission.granted) {
                            permissionResult.onSuccess();
                        } else if (permission.shouldShowRequestPermissionRationale) {
                            permissionResult.onFailure();
                        } else {
                            permissionResult.onFailureWithNeverAsk();
                        }
                    });
        }
    

    代替EventBus

    EventBus是一个Android端优化的publish/subscribe消息总线,简化了应用程序内各组件间、组件与后台线程间的通信。更多相关请参考Android事件总线之EventBus。 RxJava也可以实现事件总线,因为它们都依据于观察者模式。我们使用RxJava替换EventBus,可以减少App的体积。

    使用

    private static volatile RxBus instance;
        private PublishSubject<Object> mRxtBus;
        
        public static RxBus getDefault() {
            if (instance == null) {
                synchronized (RxBus.class) {
                    instance = new RxBus();
                }
            }
            return instance;
        }
    
        private RxBus() {
            mRxtBus = PublishSubject.create();
        }
    
        public void post(String tag, Object event) {
            Message msg = new Message(tag, event);
            mRxtBus.onNext(msg);
        }
    
        public <T> Observable<T> toEvent(Class<T> eventType) {
            return mRxtBus.ofType(eventType);
        }
     }
    

    发送

    RxBus.getDefault().post("payValue",code);
    

    接收

    subscribe = RxBus.getDefault().toEvent(RxBus.Message.class).subscribe(new Consumer<RxBus.Message>() {
                @Override
                public void accept(RxBus.Message message) throws Throwable {
                    if ("payValue".equals(message.getTag())) {
                        Log.e("yhj", "accept: " + message.getEvent().toString());
                    }
                }
            });
    

    解绑

    if (subscribe != null && !subscribe.isDisposed()) {
                subscribe.dispose();
            }
    

    Rxjava内存泄漏的处理

    Rxjava的使用不当会导致内存泄漏,使用AutoDispose可以解决这个问题,它是一个随Android生命周期事件自动解绑Rxjava订阅的方便工具。

    使用

    结合JetPack的LifeCycle(生命周期感知型组件),根据生命周期取消订阅。

    //被观察者订阅观察者,根据生命周期取消订阅,子线程订阅主线程观察
                observable.subscribeOn(Schedulers.newThread())
                        .unsubscribeOn(Schedulers.newThread())
                        .observeOn(AndroidSchedulers.mainThread())
                        .map(getAppErrorHandler())
                        .onErrorResumeNext(new HttpErrorHandler<T>())
                        .as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(owner, Lifecycle.Event.ON_DESTROY)))
                        .subscribe(observer);
    

    总结

    本文主要是对RxJava使用及Android常见使用场景进行总结,掌握这些还远远不够,RxJava还有许多强大的功能,诸如从磁盘/内存中获取缓存数据,背压策略,联想搜索优化等等。后面在项目开发中遇到相关场景再进行总结,更新。本文若有不当之处,请批评指正。

    相关文章

      网友评论

          本文标题:Android Rxjava框架的原理和使用

          本文链接:https://www.haomeiwen.com/subject/nggnrdtx.html