美文网首页
RxJava初探

RxJava初探

作者: kevinsEegets | 来源:发表于2019-12-09 16:19 被阅读0次

    我们在学习RxJava之前要了解一下,为什么使用RxJava, 使用RxJava有什么好处

    RxJava特性:

    轻松的切换线程,流式的API以及有强大的操作符,这使得我们做异步操作时变的简单,不用像以前一样写各种的Handler来回调主线程.只需要一个操作符一行代码就可以搞定.

    我所理解的RxJava的解释

    之前也看过RxJava类的文章,但是一直都是一脸迷茫,观察者被观察者一大堆都搞晕了,后来偶然间看到一位大神的解释,瞬间明白了,他将RxJava的观察者和被观察者比喻为水管的上下游,我更愿意把这个概念理解为电话这头和那头.

    有两部有线电话, 通过一根网线连接, A给B打电话, 我们可以把A理解为上游, B自然就是下游了, 他们之间通过电话线连接

    开始学习

    我们从最简单的RxJava使用入手,先上代码

            val observable = Observable.create(object :ObservableOnSubscribe<Int> {
                override fun subscribe(it: ObservableEmitter<Int>?) {
                    it?.onNext(1)
                    it?.onNext(2)
                    it?.onComplete()
                }
            })
            val observer = object: Observer<Int> {
                override fun onComplete() {
                    debugMsg("onComplete")
                }
    
                override fun onSubscribe(d: Disposable?) {
                    debugMsg("onSubscribe")
                }
    
                override fun onNext(value: Int?) {
                    debugMsg("==",value)
                }
    
                override fun onError(e: Throwable?) {
                    debugMsg("onError")
                }
            }
            observable.subscribe(observer)
    

    根据电话的例子,我们看看上述代码

    上游:

    • observable 可以看成是我们的上游
    • ObservableEmitter<Int> Emitter指发射器的意思,该类的解释就是被观察者发射器,也就是上游的发射器
    • it?.onNext(2) 上游发射器通过方法onNext()方法发射数据

    下游

    • observer 当然就是下游了
    • onSubscribe() 下游开始接收数据就会触发
    • onNext() 下游正式接收上游数据
    • onError() 下游接收到上游发送的异常消息
    • onComplete(() 下游消息接收完成的标志

    上下游连接

    subscribe 上下游通过subscribe连接, 此处需要注意 上下游连接只能连接一次(通俗一点就是 subscribe 只使用一次)

    我们逐行解释一下:

    1: 我们创建一个上游(Observable.create), 然后通过 it.onNext(1) it.onNext(2) it.onComplete(2) 发送了三条消息 "1", "2"以及 "完成" 给下游

    2: 同样我们必须定义一个接受者也就是下游 (object: Observer<Int>) ,用来接收上游发送的消息,因为我们上游发送的消息为Int类型, 所以我们此处接受下游的类型为Int

    3: 通过 observable.subscribe(observer) 连接上下游

    日志输出

    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onSubscribe
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 1
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 2
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onComplete
    

    根据日志,我们可以看出,当我们的上下游建立连接时,首先会执行 onSubscribe() ,接着通过onNext()接收到我们上游发送的数据 "1", "2", 数据全部接收完成后会执行 onComplete()

    此处需要注意几个地方:

    • 上游不能发送多个onError,发送多个会导致程序崩溃
    • 上游发送多个onComplete,下游只接受一个
    • 上游发送的onComplete和onError必须是互斥
    • 下游接收到onComplete时再不会接受上游事件

    我们用代码解释一下注意的几个地方

    1: 我们在上游添加几个onComplete()

        override fun subscribe(it: ObservableEmitter<Int>?) {
                    it?.onNext(1)
                    it?.onNext(2)
                    it?.onNext(3)
                    it?.onComplete()
                    it?.onComplete()
                    it?.onComplete()
                }
    
            })
            val observer = object: Observer<Int> {
                override fun onComplete() {
                    debugMsg("onComplete")
                }
    
                override fun onSubscribe(d: Disposable?) {
                    debugMsg("onSubscribe")
                }
    
                override fun onNext(value: Int?) {
                    debugMsg("==",value)
                }
    
                override fun onError(e: Throwable?) {
                    debugMsg("onError")
                }
            }
            observable.subscribe(observer)
    

    日志输出

    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onSubscribe
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 1
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 2
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 3
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onComplete
    

    可以看到我们的onComplete只输出了一次

    2: 上游只要发送了onComplete, 下游不会再接收后续的消息

     override fun subscribe(it: ObservableEmitter<Int>?) {
                    it?.onNext(1)
                    it?.onNext(2)
                    it?.onNext(3)
                    it?.onComplete()
                    it?.onNext(4)
                }
            })
            val observer = object: Observer<Int> {
                override fun onComplete() {
                    debugMsg("onComplete")
                }
    
                override fun onSubscribe(d: Disposable?) {
                    debugMsg("onSubscribe")
                }
    
                override fun onNext(value: Int?) {
                    debugMsg("==",value)
                }
    
                override fun onError(e: Throwable?) {
                    debugMsg("onError")
                }
            }
            observable.subscribe(observer)
    

    日志输出

    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onSubscribe
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 1
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 2
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 3
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onComplete
    

    3: 上游发送的onComplete和onError必须是互斥 以及发送多个onError报错

    val observable = Observable.create(object :ObservableOnSubscribe<Int> {
                override fun subscribe(it: ObservableEmitter<Int>?) {
                    it?.onNext(1)
                    it?.onNext(2)
                    it?.onNext(3)
                    it?.onComplete()
                    it?.onError(Throwable())
                }
            })
            val observer = object: Observer<Int> {
                override fun onComplete() {
                    debugMsg("onComplete")
                }
    
                override fun onSubscribe(d: Disposable?) {
                    debugMsg("onSubscribe")
                }
    
                override fun onNext(value: Int?) {
                    debugMsg("==",value)
                }
    
                override fun onError(e: Throwable?) {
                    debugMsg("onError")
                }
            }
            observable.subscribe(observer)
    

    日志输出

    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onSubscribe
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 1
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 2
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 3
    D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onComplete
    W/System.err:     at com.eegets.rxjava.MainActivity$loadRxjava1$observable$1.subscribe(MainActivity.kt:76)
    W/System.err:     at com.eegets.rxjava.MainActivity.loadRxjava1(MainActivity.kt:97)
    W/System.err:     at com.eegets.rxjava.MainActivity$onCreate$1.onClick(MainActivity.kt:32)
    

    如上log日志输出,程序崩溃

    在最开始我们说过流式 API, 我们可以改改如上代码,让它看起来更符合RxJava的定义方式

     Observable.create(ObservableOnSubscribe<Int> {
                it?.onNext(1)
                it?.onNext(2)
                it?.onNext(3)
                it?.onComplete()
            }).subscribe(object: Observer<Int> {
                override fun onComplete() {
                    debugMsg("onComplete")
                }
    
                override fun onSubscribe(d: Disposable?) {
                    debugMsg("onSubscribe")
                }
    
                override fun onNext(value: Int?) {
                    debugMsg("==",value)
                }
    
                override fun onError(e: Throwable?) {
                    debugMsg("onError")
                }
            })
    

    本节咱们看了如何用RxJava发送简单的消息,后续咱们还有看很多干货, 下一章 Consumer 走起

    相关文章

      网友评论

          本文标题:RxJava初探

          本文链接:https://www.haomeiwen.com/subject/qvtigctx.html