美文网首页RxJava
RxJava 的初理解和基本使用

RxJava 的初理解和基本使用

作者: tingtingtina | 来源:发表于2020-09-24 02:03 被阅读0次

    观察者设计模式

    提到RxJava 有点了解的就知道这个框架是基于观察者模式的,先来温习下观察者模式。

    观察者模式 UML

    被观察者(Observable)持有对观察者(Observer) 的引用,当被观察者发生某些变化,调用 观察者的 API 就可以了。

    RxJava 正是基于观察者模式的响应式编程框架。

    基于事件流编程

    啥是响应式?其实就是事件驱动,事件一发生,就会被相应接收处理。就这么简单,一张图就能表达。

    有一个起点和一个终点,从起点发送事件,终点接收处理事件。他们之间与一个订阅关系。

    Rxjava 使用示例

    先看下 Rxjava 的基本使用

    在使用的时候需要引入 rxjava 包

    // 依赖 RxAndroid 2x 的依赖库,完整的支持在Android 中使用,比如一些线程调度
    api 'io.reactivex.rxjava2:rxandroid:2.1.0'
    // 增加 Rxjava 2x 的依赖库
    api io.reactivex.rxjava:1.3.0"
    

    使用时,注意要引入 io 包中的类

    // 起点 可以先不用管,用 create 创建了一个 Observable
    Observable.create(new ObservableOnSubscribe<Integer>() {
    
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
    
                }
    })
    // 订阅
    .subscribe(
            // 终点 也可以不用管,new 了一个 Observer
            new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                }
    
                @Override
                public void onNext(Integer integer) {
                }
    
                @Override
                public void onError(Throwable e) {
                }
    
                @Override
                public void onComplete() {
                 }
           });
    

    Rxjava 的上游与下游

    事件流向分层:事件流可以分成上游与下游,事件的起点也就是上游,而终点就是事件的下游。

    上游可以发送多个消息给下游,像下面这样

    拆分实现时,分别创建观察者和被观察者,再订阅

            // 上游 Observable 被观察者
            Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                // 发射器 发射事件
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    log("1: 上游发射事件");
                    // 发射事件
                    emitter.onNext(1);
                    log("2: 上游发射完成");
                }
            });
    
            // 下游 Observer 观察者 处理事件
            Observer<Integer> observer = new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    log("3 下游处理事件 onNext " + integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
    
            // 上游 订阅 下游
            observable.subscribe(observer);
    

    当然也可以像最初那样,链式调用

    RxJava 调用流程

            // 上游 Observable 被观察者
            Observable.create(new ObservableOnSubscribe<Integer>() {
                // 发射器 发射事件
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    log("2: 开始发射");
                    // 发射事件
                    emitter.onNext(1);
    
                    emitter.onComplete(); // 4
                    log("5:发射完成");
                }
            })
                    //订阅
                    .subscribe(new Observer<Integer>() {
                        // 下游 Observer 观察者 处理事件
                        @Override
                        public void onSubscribe(Disposable d) {
                            // 弹出加载框 loading……
                            log("1: 订阅成功");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            log("3: 下游接收 onNext " + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            log("4: 下游接收 onError");
                        }
    
                        @Override
                        public void onComplete() {
                            // 隐藏加载 loading……
                            // 只有接收完成之后,上游的 log才会打印
                            log("4: 下游接收完成 onComplete");
                        }
                    });
    //-----------
    结果
    1: 订阅成功
    2: 开始发射
    3: 下游接收 onNext 1
    4: 下游接收完成 onComplete
    5:发射完成
    

    发射器事件

    RxJava 中有个事件发射器,用来发送事件。发射器有Emitter 有三个方法,onNext、onComplete、onError,调用后会对应下游观察者的同名方法。
    使用中有下面几个需要注意的地方

    1. onNext/onComplete/onError
    emitter.onNext(1);
    emitter.onComplete();
    // onComplete 之后,以下收不到
    emitter.onNext(2)
    

    ↑执行 emitter.onComplete 之后,继续发射,下游不再接受上游事件

    emitter.onNext(1);
    emitter.onError(new IllegalAccessError("error"));
    // onError 之后,以下收不到
    emitter.onNext(2);
    

    ↑执行 emitter.onError 之后,继续发射,下游不再接受上游事件

    1. onError 和 onComplete 顺序测试
    emitter.onNext(1);
    emitter.onComplete();
    emitter.onError(new IllegalAccessError("error"));
    

    ↑ onComplete 后再调用 onError 会报错

    emitter.onError(new IllegalAccessError("error"));
    emitter.onComplete();
    

    ↑ 先发射 onError 再发射 onComplete 不会报错,但 onComplete也不会接收。

    订阅关系

    上游和下游可以有订阅关系,这种关系也可以被切断

    // 上游 Observable 被观察者
    Observable.create(new ObservableOnSubscribe<Integer>() {
        // 发射器 发射事件
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            log("上游发射事件");
            // 发射事件
            emitter.onNext(1);
            emitter.onNext(2);
            log("上游发射完成");
        }
    }).subscribe(new Observer<Integer>() {
        // 下游 Observer 观察者 处理事件
        Disposable disposable;
        @Override
        public void onSubscribe(Disposable d) {
           // 事件被订阅
            disposable = d;
        }
        @Override
        public void onNext(Integer integer) {
            log("下游处理事件 onNext " + integer);
            // 接收上游的一个时间之后,就切断下游,让下游不再接收,但上游可以继续发
            disposable.dispose();
            // 实际用法,可在 onDestroy 中 使用上面方法,切断下游
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onComplete() {
        }
    });
    //---------
    结果
    上游发射事件
    下游处理事件 onNext 1
    上游发射完成
    

    可以看到事件2 没有被接收了,但还是会发送的,只是不接收。

    总结 Part

    • RxJava 是基于事件流思想的,有起点和终点,所以一旦满足起点和终点这样的需求,都可以使用 RxJava 来实现。
    • 标准中的观察者设计模式,一个被观察者对应多个观察者,多次注册。RxJava 是改装后的观察者设计模式,一个被观察者,一个订阅,一个观察者。
    • RxJava中上游 Observable 被观察者,下游 Observer 观察者
    • ObservableEmitter<emitter> emitter 发射器 发射事件
    • 可以拆分写,也可以链式调用
    • RxJava 的大致流程
    • RxJava 发射器方法的使用注意点
      • onComplete/onError 之后不会再接收后面发射的内容;
      • onComplete 后再 onError 会报错;先发射 onError 再发射 onComplete 不会报错,但 onComplete 也不会接收了
    • Disposable 切断下游,不再接收上游发射的事件

    写在后面

    其实使用 RxJava 比较久了,是配合 Retrofit 使用也好,还是用它来处理线程等等,都没有系统的去学习其架构思想,最近重新学习一下搞一搞。

    相关文章

      网友评论

        本文标题:RxJava 的初理解和基本使用

        本文链接:https://www.haomeiwen.com/subject/qjrmyktx.html