美文网首页
RxJava使用(一)

RxJava使用(一)

作者: Ayres | 来源:发表于2018-01-02 10:14 被阅读0次

最近跟着红橙Darren学习rxjava,跟着分析源码和写一个简单rxjava,很崇拜他的毅力和技术水平,准备跟着他的步伐走,顺便做一些笔记。跟着他分析源码后,根据自己需要有必要做一下总结和RxJava使用笔记。rxjava的使用笔记跟着Season_zlc的一系列文章走。非常感谢各位大佬的分享。

步入正题

盗用Season_zlc的一张图片,感觉这张图还是比较形象容易理解,事件流

image.png
没错就像是水管,上游发送事件1,2,3,下游接收事件1,2,3,上下游通过一定方式连接在一起
引入:
              compile 'io.reactivex.rxjava2:rxjava:2.0.1'
              compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

我们所说的上游和下游分别是Observable和Observer,通过subscribe方法注册连接,Observable和Observer同时也是观察者于被观察者

  //创建一个上游 Observable:
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    });
    //创建一个下游 Observer
    Observer<Integer> observer = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "subscribe");
        }

        @Override
        public void onNext(Integer value) {
            Log.d(TAG, "" + value);
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "error");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "complete");
        }
    };
    //建立连接
    observable.subscribe(observer);

运行结果为


image.png

上面写法体现不出rxjava的优点,可以链式调用,如下连在一起

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "subscribe");
        }

        @Override
        public void onNext(Integer value) {
            Log.d(TAG, "" + value);
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "error");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "complete");
        }
    });
ObservableEmitter:

Emitter是发射器的意思,那就很好猜了,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。

上游可以发送无限个onNext, 下游也可以接收无限个onNext.
当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不###### 再继续接收事件.
当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
上游可以不发送onComplete或onError.
最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然

注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, **并不一定会导致程序崩溃. ** 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.
盗用一张图


image.png

Disposable, 这个单词的字面意思是一次性用品,用完即可丢弃的. 那么在RxJava中怎么去理解它呢, 对应于上面的水管的例子, 我们可以把它理解成两根管道之间的一个机关, 当调用它的dispose()方法时, 它就会将两根管道切断, 从而导致下游收不到事件.
调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件.

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "emit 1");
            emitter.onNext(1);
            Log.d(TAG, "emit 2");
            emitter.onNext(2);
            Log.d(TAG, "emit 3");
            emitter.onNext(3);
            Log.d(TAG, "emit complete");
            emitter.onComplete();
            Log.d(TAG, "emit 4");
            emitter.onNext(4);
        }
    }).subscribe(new Observer<Integer>() {
        private Disposable mDisposable;
        private int i;

        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "subscribe");
            mDisposable = d;
        }

        @Override
        public void onNext(Integer value) {
            Log.d(TAG, "onNext: " + value);
            i++;
            if (i == 2) {
                Log.d(TAG, "dispose");
                mDisposable.dispose();
                Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
            }
        }

        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "error");
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "complete");
        }
    });

结果为:


image.png

从运行结果我们看到, 在收到onNext 2这个事件后, 切断了水管, 但是上游仍然发送了3, complete, 4这几个事件, 而且上游并没有因为发送了onComplete而停止. 同时可以看到下游的onSubscribe()方法是最先调用的.
subscribe()有多个重载的方法:

public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action    onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}

最后一个带有Observer参数的我们已经使用过了,这里对其他几个方法进行说明.

不带任何参数的subscribe() 表示下游不关心任何事件,你上游尽管发你的数据去吧, 老子可不管你发什么.
带有一个Consumer参数的方法表示下游只关心onNext事件, 其他的事件我假装没看见, 因此我们如果只需要onNext事件可以这么写:
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "emit 1");
            emitter.onNext(1);
            Log.d(TAG, "emit 2");
            emitter.onNext(2);
            Log.d(TAG, "emit 3");
            emitter.onNext(3);
            Log.d(TAG, "emit complete");
            emitter.onComplete();
            Log.d(TAG, "emit 4");
            emitter.onNext(4);
        }
    }).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "onNext: " + integer);
        }
    });

其他几个方法同理
结尾:感谢Season_zlc的分享,本人文章只是学习用途,笔记

相关文章

网友评论

      本文标题:RxJava使用(一)

      本文链接:https://www.haomeiwen.com/subject/mgqfnxtx.html