美文网首页
RxJava2.0从入门到放弃(一)

RxJava2.0从入门到放弃(一)

作者: SheepYan9 | 来源:发表于2017-08-22 17:31 被阅读247次

    前言

    什么是函数式编程?RxJava是什么?什么是观察者模式?巴拉巴拉要了解的东西太多了。不过不用过于纠结,这些可以当你掌握了RxJava的基本用法再去深究,现在不用过于追求理论上的,太过于形而上学的东西,先实践出真知。

    使用RxJava2.0之前是不是需要学习RxJava1.0?

    我认为:如果你想「找不同」的话可以反过去学习RxJava1.0,比较他们的区别。否则的话,直接使用RxJava2.0就好了。如果你学RxJava1.0这一部分就不用看啦。

    使用之前需要在build.gradle添加依赖:

        //具体版本以github上的为依据
        compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
        compile 'io.reactivex.rxjava2:rxjava:2.1.2'
    

    正文

    看了很多关于RxJava的教程,一开始都会长篇大论的跟你讲解观察者模式,什么是Observable,什么是Observer...一堆理论,绕老绕去,总能成功的把我绕晕。
    下面是我个人的理解是这样的:

    我的理解:被观察者 和 观察者都是两条流动的水管的关系,如果尝试着用GUI的方式来表达,应该是这样子的:

    上游下游

    对应的『上游』指的是『事件的产生』,『下游』指的是『事件的响应』。『上游』和『下游』通过一个『渠道』来连接。连接后的『上游』产生的事件 1,2,3,4对应产生『下游』事件1,2,3,4的反应。

    在RxJava中『上游』『下游』所对应的是『Observable』和『Observer』,它们之间的『链接管道』就是『subscribe()』方法,因此在RxJava中用代码来表示他们的上下游关系就是:

    第一步:创建一个『上游』---->Observable

           //创建一个『上游』
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    Log.i(TAG, "emitter"+1);
                    e.onNext(1);
                    Log.i(TAG, "emitter"+2);
                    e.onNext(2);
                    Log.i(TAG, "emitter"+3);
                    e.onNext(3);
                    Log.i(TAG, "emitter"+4);
                    e.onNext(4);
                    Log.i(TAG, "emitter"+"complete");
                    e.onComplete();
                }
            });
    

    第二步:穿件一个『下游』---->Observer

            //创建一个『下游』
            Observer<Integer> observer = new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.i(TAG, "onSubscribe..."+d.isDisposed());
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.i(TAG,"onNext..." + integer);
    
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.i(TAG,"onError..."+e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.i(TAG,"onComplete");
                }
            };
    

    第三步:通过『链接渠道』进行连接起来

            //通过『连接管道』连接起来
            observable.subscribe(observer);
    

    最后:看一下结果:

    08-22 15:13:53.552 17237 17237 I RxTag   : onSubscribe...false
    08-22 15:13:53.552 17237 17237 I RxTag   : onNext...1
    08-22 15:13:53.552 17237 17237 I RxTag   : onNext...2
    08-22 15:13:53.552 17237 17237 I RxTag   : onNext...3
    08-22 15:13:53.552 17237 17237 I RxTag   : onNext...4
    08-22 15:13:53.552 17237 17237 I RxTag   : onComplete
    

    然后也可以用超级炫酷无敌的链式编程写下来就是这样:

     Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    Log.i(TAG, "emitter"+1);
                    e.onNext(1);
                    e.onNext(2);
                    e.onNext(3);
                    e.onNext(4);
                    e.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.i(TAG, "onSubscribe..." + d.isDisposed());
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.i(TAG, "onNext..." + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.i(TAG, "onError..." + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.i(TAG, "onComplete");
                }
            });
    

    接下来,我们就可以慢慢了解这几个东东ObservableEmitter Disposable以及onSubscribe onNext onComplete onError 的关系

    ObservableEmitter: Emitter在翻译过来是『发射』的意思.
    我们稍微扒一扒源码就会发现 这是一个Interface 继承自EmitterEmitter这个接口有三个抽象方法:

    public interface Emitter<T> {
    
        /**
         * Signal a normal value.
         * @param value the value to signal, not null
         */
        void onNext(@NonNull T value);
    
        /**
         * Signal a Throwable exception.
         * @param error the Throwable to signal, not null
         */
        void onError(@NonNull Throwable error);
    
        /**
         * Signal a completion.
         */
        void onComplete();
    }
    

    也就是说Emitter可以『发射』onNext onError onComplete三个事件给『下游』观察者。

    但是,你发射事件也有一些规则需要遵循:

    • 『上游』通过Emitter可以无限制的『发射』onNext事件,『下游』可以无限的接受并且响应onNext事件.
    • 『上游』的onComplete onError方法的『发射』是唯一并且互斥的.
      • 如果在『上游』调用emitter.onComplete之后仍然『发射』了其他事件,那么『下游』在onComplete事件之后就不会有任何响应,但是『上游』在emitter.onComplete后的代码依然会执行。
      • emitter.onErroremitter.onComplete一样。
        如果在『上游』调用emitter.onComplete之后仍然『发射』了其他事件,那么『下游』在onComplete事件之后就不会有任何响应,但是『上游』在emitter.onComplete后的代码依然会执行。
      • 『上游』可以选择不发送onError onComplete事件.
      • 最为关键的事onErroronComplete只能发送一个事件,也就是说只能发送一个onError或者onComplete,不能先发一个onError 再发一个onComplete,也不能先发一个onComplete再发一个onError
      • 如果在emitter.onError或者emitter.onComplete之后再发射emitter.onError这会导致app Crash哦。但是多次发射emitter.onComplete不会导致app crash

    代码中验证这个规则:

       Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    for(int i =0;i<8;i++){
                        Log.i(TAG, "emitter"+i);
                        if(i == 3){
                            Log.i(TAG, "emitter "+"complete");
                            e.onComplete();
                        }
                        e.onNext(i);
                    }
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.i(TAG, "onSubscribe..." + d.isDisposed());
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.i(TAG, "onNext..." + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.i(TAG, "onError..." + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.i(TAG, "onComplete");
                }
            });
    

    可以看到打印结果是这样的:

    08-22 15:59:58.320 19476 19476 I RxTag   : onSubscribe...false
    08-22 15:59:58.320 19476 19476 I RxTag   : emitter0
    08-22 15:59:58.321 19476 19476 I RxTag   : onNext...0
    08-22 15:59:58.321 19476 19476 I RxTag   : emitter1
    08-22 15:59:58.321 19476 19476 I RxTag   : onNext...1
    08-22 15:59:58.321 19476 19476 I RxTag   : emitter2
    08-22 15:59:58.321 19476 19476 I RxTag   : onNext...2
    08-22 15:59:58.321 19476 19476 I RxTag   : emitter3
    08-22 15:59:58.322 19476 19476 I RxTag   : emitter complete
    08-22 15:59:58.322 19476 19476 I RxTag   : onComplete
    08-22 15:59:58.322 19476 19476 I RxTag   : emitter4
    08-22 15:59:58.322 19476 19476 I RxTag   : emitter5
    08-22 15:59:58.322 19476 19476 I RxTag   : emitter6
    08-22 15:59:58.322 19476 19476 I RxTag   : emitter7
    

    如果用图形表示的话可以这样:

    onNext:

    onCompleted:

    onError:

    显而易见:在『下游』接收到onComplete事件后就不再接受任何事件,但是『上游』依然执行emitter.onComplete之后的代码。`onError`也是这样,这里就不进行验证了
    

    说完ObservableEmitteronNext onComplete onError之间的关系之后再来说说Disposable这个玩意儿。

    Disposable:翻译过来不是『一次性的』,在这里我们可以理解为这个是『链接管道的开关』。

    按照惯例扒一扒源码发现Disposable也是一个interface

    public interface Disposable {
        /**
         * Dispose the resource, the operation should be idempotent.
         */
        void dispose();
    
        /**
         * Returns true if this resource has been disposed.
         * @return true if this resource has been disposed
         */
        boolean isDisposed();
    }
    

    disposable.dispose:就代表你你关上了『链接管道』的开关。当你关上开关了就代表『下游』不在响应『上游』的任何事件。
    disposable.isDisposed:返回你这个『链接管道』目前是开还是关的状态。返回True --> 代表关上了,返回Flase --> 代表还是开着呢。

    毛主席说过实践出真知,那么在代码里面验证一下呗:

      Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    for(int i =0;i<8;i++){
                        Log.i(TAG, "emitter"+i);
                        e.onNext(i);
                    }
                }
            }).subscribe(new Observer<Integer>() {
                private Disposable mDisposable;
                @Override
                public void onSubscribe(Disposable d) {
                    mDisposable = d;
                    Log.i(TAG, "onSubscribe..." + d.isDisposed());
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.i(TAG, "onNext..." + integer);
                    if(integer == 3){
                        Log.i(TAG, "Disposable..." + mDisposable.isDisposed());
                        mDisposable.dispose();
                        Log.i(TAG, "Disposable..." + mDisposable.isDisposed());
                    }
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.i(TAG, "onError..." + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.i(TAG, "onComplete");
                }
            });
    

    再来看一看输出结果:

    08-22 16:26:22.832 21387 21387 I RxTag   : onSubscribe...false
    08-22 16:26:22.832 21387 21387 I RxTag   : emitter0
    08-22 16:26:22.832 21387 21387 I RxTag   : onNext...0
    08-22 16:26:22.832 21387 21387 I RxTag   : emitter1
    08-22 16:26:22.832 21387 21387 I RxTag   : onNext...1
    08-22 16:26:22.832 21387 21387 I RxTag   : emitter2
    08-22 16:26:22.832 21387 21387 I RxTag   : onNext...2
    08-22 16:26:22.833 21387 21387 I RxTag   : emitter3
    08-22 16:26:22.833 21387 21387 I RxTag   : onNext...3
    08-22 16:26:22.833 21387 21387 I RxTag   : Disposable...false
    08-22 16:26:22.833 21387 21387 I RxTag   : Disposable...true
    08-22 16:26:22.833 21387 21387 I RxTag   : emitter4
    08-22 16:26:22.833 21387 21387 I RxTag   : emitter5
    08-22 16:26:22.833 21387 21387 I RxTag   : emitter6
    08-22 16:26:22.833 21387 21387 I RxTag   : emitter7
    

    再次证明了Disposable控制着这个『链接管道』开关,一旦你关上开关了就代表『下游』不在响应『上游』的任何事件,但是『上游』依然会继续执行。

    讲到这,可能你并没有发现RxJava的好处,却觉得除了链式会酷一点完全没啥优势啊。那么我们可以慢慢来先看看Observable.subscribe()这方法的几个重载方法吧。

    public final Disposable subscribe(){}
    public final Disposable subscribe(Consumer<? super T> onNext){}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete){}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe){}
    public final void subscribe(Observer<? super T> observer) 
    

    你可以只接受onNext() 或者选择性接受onNext()和onError()....

            Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    for(int i =0;i<8;i++){
                        Log.i(TAG, "emitter"+i);
                        e.onNext(i);
                    }
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.i(TAG, "onNext..." + integer);
                }
            });
    

    是不是瞬间简单起来了。
    要想更加简单那么我们来一个kotlin的版本,简洁的到怀疑人生:

    Observable.create(ObservableOnSubscribe<Int> { e ->
                for (i in 0..7) {
                    Log.i(TAG, "emitter" + i)
                    e.onNext(i)
                }
            }).subscribe { integer -> Log.i(TAG, "onNext..." + integer) }
    

    总结

    这一部分主要讲解RxJava中关于『上游』『下游』『链接管道』『开关』的一些理解,只是浅显的最基本的进行一个被观察者和观察者的订阅。下一部分将深一步的介绍RxJava在线程调度的作用。关于管道的理解是参考了http://www.jianshu.com/p/464fa025229e 他的讲解,觉得特别好就借鉴来学习。

    RxJava2.0从入门到放弃(二)

    相关文章

      网友评论

          本文标题:RxJava2.0从入门到放弃(一)

          本文链接:https://www.haomeiwen.com/subject/mmildxtx.html