美文网首页
RxJava 基本使用

RxJava 基本使用

作者: 翻滚吧王咸鱼 | 来源:发表于2018-05-30 20:52 被阅读0次

    对RxJava 的学习,做一个记录,为以后的面试复习用. 我看一些RxJava 的介绍,个人感觉
    RxJava 这个作者讲的简单明了.
    通过水管来讲

    image.png

    老规矩 先放依赖
    要在Android中使用RxJava2, 先添加Gradle配置:

      //解释一下  implementation 跟compile 的区别
      //compile依赖的确实可以做到依赖传递,但是AS 3.0开始推荐使用implementation取代了compile,
    //依赖传递失效了.  而 ##implement## 的意思是将该依赖隐藏在内部,而不对外部公开. 在 app //mudule 中//使用 implement 依赖的第三方库, 在其他 mudule 是无法调用的,## compile ##android //studio 3.0 版本后废弃该指令 改用 api 代替, api 完全等同于之前的 compile 指令,
    // 也就是普通的依赖, //第三方库在 mudule 中依赖后其他 mudule 都可以使用该库.官方推荐在不影响的前提下优先使用 //implement 指令依赖.
        implementation 'io.reactivex.rxjava2:rxjava:2.1.14'
        implementation 'io.reactivex.rxjava2:rxandroid:2.0.2' 
    

    下面就简单使用的例子

      //Observable 被观察者  subscribe订阅  需要做的事
            // create() 是 RxJava 最基本的创造事件序列的方法
            Observable.create(new ObservableOnSubscribe<Integer>() {
                // 此处传入了一个 OnSubscribe 对象参数
                // 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
                // 即观察者会依次调用对应事件的复写方法从而响应事件
                // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
    
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                   // ObservableEmitter类对象产生事件并通知观察者
                    // ObservableEmitter类介绍
                    // a. 定义:事件发射器
                    // b. 作用:定义需要发送的事件 & 向观察者发送事件
                    emitter.onNext(1);  //发射第一个事件
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            })
                    //观察者Observer subscribe是建立联系
                    .subscribe(new Observer<Integer>() {  //事件的响应拿到结果
    
                      //  观察者接收事件前,默认最先调用复写 onSubscribe()
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e("----->", "subscribe");
                        }
                        // 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
                        @Override
                        public void onNext(Integer integer) {
                            Log.d("----->", "" + integer);
                        }
                        // 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
                        @Override
                        public void onError(Throwable e) {
                            Log.d("----->", "异常");
                        }
                        // 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
                        @Override
                        public void onComplete() {
                            Log.d("----->", "complete");
                        }
                    });
    
            //  说明:Subscriber类 = RxJava 内置的一个实现了 Observer 的抽象类,对 Observer 接口进行了扩展
    
    <-- Observable.subscribe(Subscriber) 的内部实现 -->
    
    public Subscription subscribe(Subscriber subscriber) {
        subscriber.onStart();
        // 步骤1中 观察者  subscriber抽象类复写的方法,用于初始化工作
        onSubscribe.call(subscriber);
        // 通过该调用,从而回调观察者中的对应方法从而响应被观察者生产的事件
        // 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
        // 同时也看出:Observable只是生产事件,真正的发送事件是在它被订阅的时候,即当 subscribe() 方法执行时
    }
    
    image.png

    注意: 只有当上游和下游建立连接之后, 上游才会开始发送事件. 也就是调用了subscribe() 方法之后才开始发送事件.

    ObservableEmitter:

    Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。

    但是,请注意,并不意味着你可以随意乱七八糟发射事件,需要满足一定的规则:

    当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
    当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
    上游可以不发送onComplete或onError.
    最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然
    
    
    注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, **并不一定会导致程序崩溃. ** 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.
    
    Disposable:

    这个单词的字面意思是一次性用品,用完即可丢弃的. 那么在RxJava中怎么去理解它呢, 对应于上面的水管的例子, 我们可以把它理解成两根管道之间的一个机关, 当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件.注意: 调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件.

    被观察者 Observable的subscribe()具备多个重载的方法

     * public final Disposable subscribe() {}
             // 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
    
             public final Disposable subscribe(Consumer<? super T> onNext) {}
             // 表示观察者只对被观察者发送的Next事件作出响应
             public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
             // 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
    
             public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
             // 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
    
             public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
             // 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应
    
             public final void subscribe(Observer<? super T> observer) {}
             // 表示观察者对被观察者发送的任何事件都作出响应
    

    可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接

    /**
         * 可采用 Disposable.dispose() 切断观察者 与 被观察者 之间的连接
         * 即观察者 无法继续 接收 被观察者的事件,但被观察者还是可以继续发送事件
         */
        private void Dome2() {
            Observable.create(new ObservableOnSubscribe<Integer>(){
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d("----->", "开始发射:");
                    emitter.onNext(1);  //发射第一个事件
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onNext(4);
                    emitter.onComplete();
    
                }
            }).subscribe(new Observer<Integer>() {
                // 1\. 定义Disposable类变量
                private Disposable mDisposable;
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d("----->", "接收到事件开始了"  );
                    // 2\. 对Disposable类变量赋值
                    mDisposable = d;
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d("----->", "接收到事件"+integer  );
                    if (integer == 2) {
                        // 设置在接收到第二个事件后切断观察者和被观察者的连接
                        mDisposable.dispose();
                        Log.d("----->", "已经切断了连接:" + mDisposable.isDisposed());
                    }
    
    
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d("---->", "对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    Log.d("---->", "对Complete事件作出响应");
                }
            });
        }
    

    简单学习到这里

    相关文章

      网友评论

          本文标题:RxJava 基本使用

          本文链接:https://www.haomeiwen.com/subject/cczajftx.html