美文网首页
RxJava使用之认识RxJava

RxJava使用之认识RxJava

作者: MengkZhang | 来源:发表于2019-04-29 23:20 被阅读0次

写在前面:由于公司拖欠工资,2月的工资都没拿到,所以最近出来面试(已收两家offer但薪资都很低),毫无疑问,每次面试都问到了RxJava,所以打算学习一下RxJava的具体使用。

image.png

RxJava已经出来很久了,也是一直在用,但是总感觉没有完全掌握它,所以花了点时间也阅读了很多文章以及官方的文档,决定对其好好总结一番分享点有价值的文章,写一个系列,对于RxJava1.x就不做讨论了过去的就让它过去吧0.0,在写本博客时GitHub上最新的RxJava版本是2.2.2,地址:https://github.com/ReactiveX/RxJava

介绍以下知识点

①什么是RxJava

②RxJava的优势是什么

③RxJava如何使用

④主要用来做什么?

一、认识RxJava及RxJava的优势

首先来看第一个问题,RxJava到底是个什么东东?

官网给出的描述是这样的:RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

翻译过来就是:RxJava是一个为使用可观测的序列来组成异步的、基于事件的程序而产生的库。说白了RxJava就是用来解决异步的,解决异步的方法有很多为什么RxJava这么流行呢?肯定它有它强于其它方法的优势。那咱们来看看它的优势是什么?

1.函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态

2.代码书写逻辑清晰:Rx的操作符通通常可以将复杂的难题简化为逻辑非常简单的代码,可读性非常强,随着程序逻辑的复杂,依然保持简洁,解耦了各个模块操作,单一化,不嵌套。

3.异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
4.轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题

5.轻量级框架、支持Java 8 lambda、支持Java 6+和Android 2.3+

有没有感觉很厉害,有木有被RxJava的气场震慑到?

image.png

二、RxJava的使用

清楚了RxJava的定义以及优点之后,接着我们来看下RxJava如何使用,对于RxJava的使用首先必须明白以下三个概念

1.Observable(被观察者)

2.Observer(观察者)

3.Subscribe(订阅)

为了更好的理解这三者之间的协作可以看下图

image.png

其中上游是Observable,下游是Observer,上游与下游建立连接是通过Subscribe方法建立连接的,这样理解是不是更好理解?

对于使用RxJava总共有三个步骤,可以参考上图

①创建上游的Observable

②创建下游的Observer

③建立连接

说了这么多,来搞一波代码尝尝咸淡

首先我们在Gradle中做如下配置

implementation 'io.reactivex.rxjava2:rxjava:2.2.2'

接着就可以肆无忌惮的装逼了,按照上述说的三个步骤代码如下

    //创建上游的Observable
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("Hello");
            emitter.onNext("World");
            emitter.onNext("!!!!!!");

            emitter.onComplete();
        }
    });

    //创建下游的Observer
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {

            Log.e(TAG,"onSubscribe");

        }

        @Override
        public void onNext(String s) {

            Log.e(TAG,"onNext : " + s);

        }

        @Override
        public void onError(Throwable e) {

            Log.e(TAG,"onError");

        }

        @Override
        public void onComplete() {

            Log.e(TAG,"onComplete");

        }
    };

    //连接上下游
    observable.subscribe(observer);

程序执行结果

    2019-04-29 22:23:12.222 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onSubscribe
    2019-04-29 22:23:12.224 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onNext : Hello
    2019-04-29 22:23:12.224 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onNext : World
    2019-04-29 22:23:12.224 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onNext : !!!!!!
    2019-04-29 22:23:12.224 10151-10151/com.zhang.rxjavademo1 E/MainActivity: onComplete

我们来分析下执行的过程首先我们分别用代码创建了上游的Observable对象和下游的Observer对象,创建好这两个对象之后通过Observable对象的subscribe()方法将上游与下游建立了连接。建立连接成功之后会调用下游observer的onSubscribe回调走onSubscribe方法预示着上游与下游建立了连接,此时打印了“onSubscribe”,之后上游会依次发送三个事件,代码如下

emitter.onNext("Hello");
emitter.onNext("Word");
emitter.onNext("!!!");
emitter.onComplete();

因为上游与下游已经建立了连接所以下游会收到这三个事件,对应的会走下游observer的onNext方法的回调,所以会依次打印”Hello”、”Word”、”!!!”,之后上游发送发送emitter.onComplete();预示着上游发送事件结束,此时会调用下游的onComplete方法的回调,此时会打印onComplete。以上就是一个简单的RxJava的小案例。
当然对于刚接触到RxJava的同学可能还是有点陌生,那接下来咱们就来把这个案例理解透彻,首先来看看两个对象ObservableEmitter和Disposable这两个对象,其中Emitter是发射的意思,ObservableEmitter的源码实现中共有三个方法:

onNext:用来发送数据,可多次调用,每调用一次发送一条数据
onError:用来发送异常通知,只发送一次,若多次调用则第二次调用时报错
onComplete:用来发送完成通知,只发送一次,若多次调用只发送第一条

在一个正确运行的事件序列中,onCompleted() 和 onError() 必须唯一并且互斥,数据在发送时,出现异常可以调用onError发送异常通知也可以不调用,因为其所在的方法subscribe会抛出异常,若数据在全部发送完之后均正常可以调用onComplete发送完成通知;其中,onError与onComplete不做强制性调用,并且两者是事件序列中的最后一个。Observable可以发送无限个onNext, 观察者也可以接收无限个onNext。Observable发送了一个onComplete(或者onError)后,可以继续发送onComplete(或者onError)后续事件,但观察者收到onComplete(或者onError)后不再接收事件。Observable可以不发送onComplete或onError。

对象Observer中的三个方法(onNext,onError,onComplete)正好与Emitter中的三个方法相对应,分别对Emitter中对应方法的行为作出响应。

Emitter调用onNext发送数据时,Observer会通过onNext接收数据。
Emitter调用onError发送异常通知时,Observer会通过onError接收异常通知,此时不再接收上游发送的数据(此时上游是可以发送数据的)
Emitter调用onComplete发送完成通知时,Observer会通过onComplete接收完成通知,并且不再接收上游发送的数据(此时上游是可以发送数据的)

1.中间发送一个oncomplete事件,原理图如下:

image.png
    //创建上游Observable
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {

            log("上游 发射--a");
            emitter.onNext("上游 发射--a");

            log("上游 发射--b");
            emitter.onNext("上游 发射--b");

            emitter.onComplete();

            log("上游 发射--c");
            emitter.onNext("上游 发射--c");

            log("上游 发射--d");
            emitter.onNext("上游 发射--d");

        }
    });

    //创建下游Observer
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {

            log("下游 onSubscribe");

        }

        @Override
        public void onNext(String s) {

            log("下游 onNext :" + s);

        }

        @Override
        public void onError(Throwable e) {

            log("下游 onError");

        }

        @Override
        public void onComplete() {

            log("下游 onComplete");

        }
    };

    //连接上下游
    observable.subscribe(observer);

程序执行结果

    2019-04-29 22:34:16.577 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 下游 onSubscribe
    2019-04-29 22:34:16.577 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--a
    2019-04-29 22:34:16.580 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--a
    2019-04-29 22:34:16.580 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--b
    2019-04-29 22:34:16.580 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--b
    2019-04-29 22:34:16.582 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 下游 onComplete
    2019-04-29 22:34:16.582 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--c
    2019-04-29 22:34:16.582 10486-10486/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--d

从日志中可以看出在上游发送onComplete方法之后,上游还是正常的发送事件,但是下游却没有接收到上游发送的事件。

2.中间发送一个onerror,原理图如下:

image.png

代码只需修改上游的发送事件的代码:

    //创建上游Observable
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {

            log("上游 发射--a");
            emitter.onNext("上游 发射--a");

            log("上游 发射--b");
            emitter.onNext("上游 发射--b");

            emitter.onError(new NullPointerException());

            log("上游 发射--c");
            emitter.onNext("上游 发射--c");

            log("上游 发射--d");
            emitter.onNext("上游 发射--d");

        }
    });

    //创建下游Observer
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {

            log("下游 onSubscribe");

        }

        @Override
        public void onNext(String s) {

            log("下游 onNext :" + s);

        }

        @Override
        public void onError(Throwable e) {

            log("下游 onError");

        }

        @Override
        public void onComplete() {

            log("下游 onComplete");

        }
    };

    //连接上下游
    observable.subscribe(observer);

程序执行结果

    2019-04-29 22:40:52.290 10822-10822/? E/MainActivity: 下游 onSubscribe
    2019-04-29 22:40:52.290 10822-10822/? E/MainActivity: 上游 发射--a
    2019-04-29 22:40:52.292 10822-10822/? E/MainActivity: 下游 onNext :上游 发射--a
    2019-04-29 22:40:52.292 10822-10822/? E/MainActivity: 上游 发射--b
    2019-04-29 22:40:52.292 10822-10822/? E/MainActivity: 下游 onNext :上游 发射--b
    2019-04-29 22:40:52.292 10822-10822/? E/MainActivity: 下游 onError
    2019-04-29 22:40:52.293 10822-10822/? E/MainActivity: 上游 发射--c
    2019-04-29 22:40:52.293 10822-10822/? E/MainActivity: 上游 发射--d

从日志中可以看出在发送了onError事件后,上游是可以发送数据的,但是下游将不再接收上游发送的数据。

3.发送多个onError或onComple
前面我们说到onComplete和onError必须唯一并且互斥若发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃,这一点留给大家去验证。

了解完ObservableEmitter之后我们再来

了解一下Disposable 这个对象,Disposable这个对象是用来切断上游与下游的连接的一个对象,切断之后上游可以继续发送事件,但是下游将不会再收到上游发送的事件 ##,

废话不多说咱们来看一段代码:

    //创建上游Observable
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {

            log("上游 发射--a");
            emitter.onNext("上游 发射--a");

            log("上游 发射--b");
            emitter.onNext("上游 发射--b");

            log("上游 发射--c");
            emitter.onNext("上游 发射--c");

            log("上游 发射--d");
            emitter.onNext("上游 发射--d");

        }
    });

    //创建下游Observer
    Observer<String> observer = new Observer<String>() {

        Disposable mDisposable;

        @Override
        public void onSubscribe(Disposable d) {

            mDisposable = d;
            log("下游 onSubscribe");

        }

        @Override
        public void onNext(String s) {

            if (s.equals("上游 发射--c")) {
                mDisposable.dispose();
            }

            log("下游 onNext :" + s);

        }

        @Override
        public void onError(Throwable e) {

            log("下游 onError");

        }

        @Override
        public void onComplete() {

            log("下游 onComplete");

        }
    };

    //连接上下游
    observable.subscribe(observer);

程序执行结果

    2019-04-29 22:45:20.821 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 下游 onSubscribe
    2019-04-29 22:45:20.821 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--a
    2019-04-29 22:45:20.823 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--a
    2019-04-29 22:45:20.823 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--b
    2019-04-29 22:45:20.823 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--b
    2019-04-29 22:45:20.823 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--c
    2019-04-29 22:45:20.824 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 下游 onNext :上游 发射--c
    2019-04-29 22:45:20.824 10975-10975/com.zhang.rxjavademo1 E/MainActivity: 上游 发射--d

从日志中我们可以看出,在调用了dispose之后,上游的发送是没有受影响的,但是下游收不到上游发送的数据。

对于之前所述的代码写熟练之后就是经常说的链式调用

   Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            log("上游 发射--a");
            emitter.onNext("上游 发射--a");

            log("上游 发射--b");
            emitter.onNext("上游 发射--b");

            log("上游 发射--c");
            emitter.onNext("上游 发射--c");

            log("上游 发射--d");
            emitter.onNext("上游 发射--d");
        }
    })
            .subscribe(new Observer<String>() {Disposable mDisposable;

                @Override
                public void onSubscribe(Disposable d) {

                    mDisposable = d;
                    log("下游 onSubscribe");

                }

                @Override
                public void onNext(String s) {

                    if (s.equals("上游 发射--c")) {
                        mDisposable.dispose();
                    }

                    log("下游 onNext :" + s);

                }

                @Override
                public void onError(Throwable e) {

                    log("下游 onError");

                }

                @Override
                public void onComplete() {

                    log("下游 onComplete");

                }
            });

好了现在有个大致的印象RxJava到底可以用在什么地方,后续会有一系列关于RxJava的文章来讲解具体的用法,程序源码:https://github.com/MengkZhang/RxJavaDemo1

image.png

相关文章

  • RxJava使用之认识RxJava

    写在前面:由于公司拖欠工资,2月的工资都没拿到,所以最近出来面试(已收两家offer但薪资都很低),毫无疑问,每次...

  • Rxjava入门与使用

    认识 rxjava RxJava是 ReactiveX 在JVM上的一个实现,ReactiveX使用Observa...

  • 无标题文章

    [TOC] 什么是rxjava 什么是rxjava 什么是rxjava 什么是rxjava 什么是rxjava 什...

  • Rxjava系列(六) RxJava2.0操作符详解

    Rxjava2.0概述 通过前面的文章介绍,读者对RxJava2.0应该有了初步的认识。RxJava2.0相对1....

  • Rxjava学习笔记

    1.认识 rxjava RxJava是 ReactiveX 在JVM上的一个实现,ReactiveX使用Obser...

  • 认识 RxJava

    RxJava 是一个针对响应式编程思想的一个框架,其本质是利用了观察者模式,充分的利用的函数式编程的思想,学习此框...

  • RxJava2

    一、RxJava GitHub: RxJava2Demo 二、RxJava的概念 RxJava RxAndroid...

  • rxjava2+retorfit.md

    [TOC] ## 什么是rxjava ## 什么是rxjava ## 什么是rxjava ## 什么是rxjava...

  • 学习RxJava之Lambda表达式

    RxJava代码中,除了简洁的链式调用之外,lambda表达式的引入使代码显得更酷了。但是方便的同时带来的问题就是...

  • RxJava学习笔记

    RxJava Rxjava的GitHub官网上是这样介绍rxjava的:RxJava is a Java VM i...

网友评论

      本文标题:RxJava使用之认识RxJava

      本文链接:https://www.haomeiwen.com/subject/grdpnqtx.html