美文网首页
RxJava2.0从入门到放弃(一)

RxJava2.0从入门到放弃(一)

作者: SheepYan9 | 来源:发表于2017-08-22 17:31 被阅读247次

前言

什么是函数式编程?RxJava是什么?什么是观察者模式?巴拉巴拉要了解的东西太多了。不过不用过于纠结,这些可以当你掌握了RxJava的基本用法再去深究,现在不用过于追求理论上的,太过于形而上学的东西,先实践出真知。

使用RxJava2.0之前是不是需要学习RxJava1.0?

我认为:如果你想「找不同」的话可以反过去学习RxJava1.0,比较他们的区别。否则的话,直接使用RxJava2.0就好了。如果你学RxJava1.0这一部分就不用看啦。

使用之前需要在build.gradle添加依赖:

    //具体版本以github上的为依据
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    compile 'io.reactivex.rxjava2:rxjava:2.1.2'

正文

看了很多关于RxJava的教程,一开始都会长篇大论的跟你讲解观察者模式,什么是Observable,什么是Observer...一堆理论,绕老绕去,总能成功的把我绕晕。
下面是我个人的理解是这样的:

我的理解:被观察者 和 观察者都是两条流动的水管的关系,如果尝试着用GUI的方式来表达,应该是这样子的:

上游下游

对应的『上游』指的是『事件的产生』,『下游』指的是『事件的响应』。『上游』和『下游』通过一个『渠道』来连接。连接后的『上游』产生的事件 1,2,3,4对应产生『下游』事件1,2,3,4的反应。

在RxJava中『上游』『下游』所对应的是『Observable』和『Observer』,它们之间的『链接管道』就是『subscribe()』方法,因此在RxJava中用代码来表示他们的上下游关系就是:

第一步:创建一个『上游』---->Observable

       //创建一个『上游』
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.i(TAG, "emitter"+1);
                e.onNext(1);
                Log.i(TAG, "emitter"+2);
                e.onNext(2);
                Log.i(TAG, "emitter"+3);
                e.onNext(3);
                Log.i(TAG, "emitter"+4);
                e.onNext(4);
                Log.i(TAG, "emitter"+"complete");
                e.onComplete();
            }
        });

第二步:穿件一个『下游』---->Observer

        //创建一个『下游』
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe..."+d.isDisposed());
            }

            @Override
            public void onNext(Integer integer) {
                Log.i(TAG,"onNext..." + integer);

            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG,"onError..."+e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i(TAG,"onComplete");
            }
        };

第三步:通过『链接渠道』进行连接起来

        //通过『连接管道』连接起来
        observable.subscribe(observer);

最后:看一下结果:

08-22 15:13:53.552 17237 17237 I RxTag   : onSubscribe...false
08-22 15:13:53.552 17237 17237 I RxTag   : onNext...1
08-22 15:13:53.552 17237 17237 I RxTag   : onNext...2
08-22 15:13:53.552 17237 17237 I RxTag   : onNext...3
08-22 15:13:53.552 17237 17237 I RxTag   : onNext...4
08-22 15:13:53.552 17237 17237 I RxTag   : onComplete

然后也可以用超级炫酷无敌的链式编程写下来就是这样:

 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                Log.i(TAG, "emitter"+1);
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onNext(4);
                e.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe..." + d.isDisposed());
            }

            @Override
            public void onNext(Integer integer) {
                Log.i(TAG, "onNext..." + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError..." + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
            }
        });

接下来,我们就可以慢慢了解这几个东东ObservableEmitter Disposable以及onSubscribe onNext onComplete onError 的关系

ObservableEmitter: Emitter在翻译过来是『发射』的意思.
我们稍微扒一扒源码就会发现 这是一个Interface 继承自EmitterEmitter这个接口有三个抽象方法:

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

也就是说Emitter可以『发射』onNext onError onComplete三个事件给『下游』观察者。

但是,你发射事件也有一些规则需要遵循:

  • 『上游』通过Emitter可以无限制的『发射』onNext事件,『下游』可以无限的接受并且响应onNext事件.
  • 『上游』的onComplete onError方法的『发射』是唯一并且互斥的.
    • 如果在『上游』调用emitter.onComplete之后仍然『发射』了其他事件,那么『下游』在onComplete事件之后就不会有任何响应,但是『上游』在emitter.onComplete后的代码依然会执行。
    • emitter.onErroremitter.onComplete一样。
      如果在『上游』调用emitter.onComplete之后仍然『发射』了其他事件,那么『下游』在onComplete事件之后就不会有任何响应,但是『上游』在emitter.onComplete后的代码依然会执行。
    • 『上游』可以选择不发送onError onComplete事件.
    • 最为关键的事onErroronComplete只能发送一个事件,也就是说只能发送一个onError或者onComplete,不能先发一个onError 再发一个onComplete,也不能先发一个onComplete再发一个onError
    • 如果在emitter.onError或者emitter.onComplete之后再发射emitter.onError这会导致app Crash哦。但是多次发射emitter.onComplete不会导致app crash

代码中验证这个规则:

   Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                for(int i =0;i<8;i++){
                    Log.i(TAG, "emitter"+i);
                    if(i == 3){
                        Log.i(TAG, "emitter "+"complete");
                        e.onComplete();
                    }
                    e.onNext(i);
                }
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe..." + d.isDisposed());
            }

            @Override
            public void onNext(Integer integer) {
                Log.i(TAG, "onNext..." + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError..." + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
            }
        });

可以看到打印结果是这样的:

08-22 15:59:58.320 19476 19476 I RxTag   : onSubscribe...false
08-22 15:59:58.320 19476 19476 I RxTag   : emitter0
08-22 15:59:58.321 19476 19476 I RxTag   : onNext...0
08-22 15:59:58.321 19476 19476 I RxTag   : emitter1
08-22 15:59:58.321 19476 19476 I RxTag   : onNext...1
08-22 15:59:58.321 19476 19476 I RxTag   : emitter2
08-22 15:59:58.321 19476 19476 I RxTag   : onNext...2
08-22 15:59:58.321 19476 19476 I RxTag   : emitter3
08-22 15:59:58.322 19476 19476 I RxTag   : emitter complete
08-22 15:59:58.322 19476 19476 I RxTag   : onComplete
08-22 15:59:58.322 19476 19476 I RxTag   : emitter4
08-22 15:59:58.322 19476 19476 I RxTag   : emitter5
08-22 15:59:58.322 19476 19476 I RxTag   : emitter6
08-22 15:59:58.322 19476 19476 I RxTag   : emitter7

如果用图形表示的话可以这样:

onNext:

onCompleted:

onError:

显而易见:在『下游』接收到onComplete事件后就不再接受任何事件,但是『上游』依然执行emitter.onComplete之后的代码。`onError`也是这样,这里就不进行验证了

说完ObservableEmitteronNext onComplete onError之间的关系之后再来说说Disposable这个玩意儿。

Disposable:翻译过来不是『一次性的』,在这里我们可以理解为这个是『链接管道的开关』。

按照惯例扒一扒源码发现Disposable也是一个interface

public interface Disposable {
    /**
     * Dispose the resource, the operation should be idempotent.
     */
    void dispose();

    /**
     * Returns true if this resource has been disposed.
     * @return true if this resource has been disposed
     */
    boolean isDisposed();
}

disposable.dispose:就代表你你关上了『链接管道』的开关。当你关上开关了就代表『下游』不在响应『上游』的任何事件。
disposable.isDisposed:返回你这个『链接管道』目前是开还是关的状态。返回True --> 代表关上了,返回Flase --> 代表还是开着呢。

毛主席说过实践出真知,那么在代码里面验证一下呗:

  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                for(int i =0;i<8;i++){
                    Log.i(TAG, "emitter"+i);
                    e.onNext(i);
                }
            }
        }).subscribe(new Observer<Integer>() {
            private Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
                mDisposable = d;
                Log.i(TAG, "onSubscribe..." + d.isDisposed());
            }

            @Override
            public void onNext(Integer integer) {
                Log.i(TAG, "onNext..." + integer);
                if(integer == 3){
                    Log.i(TAG, "Disposable..." + mDisposable.isDisposed());
                    mDisposable.dispose();
                    Log.i(TAG, "Disposable..." + mDisposable.isDisposed());
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError..." + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
            }
        });

再来看一看输出结果:

08-22 16:26:22.832 21387 21387 I RxTag   : onSubscribe...false
08-22 16:26:22.832 21387 21387 I RxTag   : emitter0
08-22 16:26:22.832 21387 21387 I RxTag   : onNext...0
08-22 16:26:22.832 21387 21387 I RxTag   : emitter1
08-22 16:26:22.832 21387 21387 I RxTag   : onNext...1
08-22 16:26:22.832 21387 21387 I RxTag   : emitter2
08-22 16:26:22.832 21387 21387 I RxTag   : onNext...2
08-22 16:26:22.833 21387 21387 I RxTag   : emitter3
08-22 16:26:22.833 21387 21387 I RxTag   : onNext...3
08-22 16:26:22.833 21387 21387 I RxTag   : Disposable...false
08-22 16:26:22.833 21387 21387 I RxTag   : Disposable...true
08-22 16:26:22.833 21387 21387 I RxTag   : emitter4
08-22 16:26:22.833 21387 21387 I RxTag   : emitter5
08-22 16:26:22.833 21387 21387 I RxTag   : emitter6
08-22 16:26:22.833 21387 21387 I RxTag   : emitter7

再次证明了Disposable控制着这个『链接管道』开关,一旦你关上开关了就代表『下游』不在响应『上游』的任何事件,但是『上游』依然会继续执行。

讲到这,可能你并没有发现RxJava的好处,却觉得除了链式会酷一点完全没啥优势啊。那么我们可以慢慢来先看看Observable.subscribe()这方法的几个重载方法吧。

public final Disposable subscribe(){}
public final Disposable subscribe(Consumer<? super T> onNext){}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete){}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe){}
public final void subscribe(Observer<? super T> observer) 

你可以只接受onNext() 或者选择性接受onNext()和onError()....

        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                for(int i =0;i<8;i++){
                    Log.i(TAG, "emitter"+i);
                    e.onNext(i);
                }
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.i(TAG, "onNext..." + integer);
            }
        });

是不是瞬间简单起来了。
要想更加简单那么我们来一个kotlin的版本,简洁的到怀疑人生:

Observable.create(ObservableOnSubscribe<Int> { e ->
            for (i in 0..7) {
                Log.i(TAG, "emitter" + i)
                e.onNext(i)
            }
        }).subscribe { integer -> Log.i(TAG, "onNext..." + integer) }

总结

这一部分主要讲解RxJava中关于『上游』『下游』『链接管道』『开关』的一些理解,只是浅显的最基本的进行一个被观察者和观察者的订阅。下一部分将深一步的介绍RxJava在线程调度的作用。关于管道的理解是参考了http://www.jianshu.com/p/464fa025229e 他的讲解,觉得特别好就借鉴来学习。

RxJava2.0从入门到放弃(二)

相关文章

网友评论

      本文标题:RxJava2.0从入门到放弃(一)

      本文链接:https://www.haomeiwen.com/subject/mmildxtx.html