我们在学习RxJava之前要了解一下,为什么使用RxJava, 使用RxJava有什么好处
RxJava特性:
轻松的切换线程,流式的API以及有强大的操作符,这使得我们做异步操作时变的简单,不用像以前一样写各种的Handler来回调主线程.只需要一个操作符一行代码就可以搞定.
我所理解的RxJava的解释
之前也看过RxJava类的文章,但是一直都是一脸迷茫,观察者被观察者一大堆都搞晕了,后来偶然间看到一位大神的解释,瞬间明白了,他将RxJava的观察者和被观察者比喻为水管的上下游,我更愿意把这个概念理解为电话这头和那头.
有两部有线电话, 通过一根网线连接, A给B打电话, 我们可以把A理解为上游, B自然就是下游了, 他们之间通过电话线连接
开始学习
我们从最简单的RxJava使用入手,先上代码
val observable = Observable.create(object :ObservableOnSubscribe<Int> {
override fun subscribe(it: ObservableEmitter<Int>?) {
it?.onNext(1)
it?.onNext(2)
it?.onComplete()
}
})
val observer = object: Observer<Int> {
override fun onComplete() {
debugMsg("onComplete")
}
override fun onSubscribe(d: Disposable?) {
debugMsg("onSubscribe")
}
override fun onNext(value: Int?) {
debugMsg("==",value)
}
override fun onError(e: Throwable?) {
debugMsg("onError")
}
}
observable.subscribe(observer)
根据电话的例子,我们看看上述代码
上游:
-
observable
可以看成是我们的上游 -
ObservableEmitter<Int>
Emitter指发射器的意思,该类的解释就是被观察者发射器,也就是上游的发射器 -
it?.onNext(2)
上游发射器通过方法onNext()方法发射数据
下游
- observer 当然就是下游了
- onSubscribe() 下游开始接收数据就会触发
- onNext() 下游正式接收上游数据
- onError() 下游接收到上游发送的异常消息
- onComplete(() 下游消息接收完成的标志
上下游连接
subscribe 上下游通过subscribe连接, 此处需要注意 上下游连接只能连接一次(通俗一点就是 subscribe 只使用一次)
我们逐行解释一下:
1: 我们创建一个上游(Observable.create
), 然后通过 it.onNext(1) it.onNext(2) it.onComplete(2)
发送了三条消息 "1", "2"以及 "完成"
给下游
2: 同样我们必须定义一个接受者也就是下游 (object: Observer<Int>
) ,用来接收上游发送的消息,因为我们上游发送的消息为Int类型, 所以我们此处接受下游的类型为Int
3: 通过 observable.subscribe(observer)
连接上下游
日志输出
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onSubscribe
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 1
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 2
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onComplete
根据日志,我们可以看出,当我们的上下游建立连接时,首先会执行 onSubscribe() ,接着通过onNext()接收到我们上游发送的数据 "1", "2", 数据全部接收完成后会执行 onComplete()
此处需要注意几个地方:
- 上游不能发送多个onError,发送多个会导致程序崩溃
- 上游发送多个onComplete,下游只接受一个
- 上游发送的onComplete和onError必须是互斥
- 下游接收到onComplete时再不会接受上游事件
我们用代码解释一下注意的几个地方
1: 我们在上游添加几个onComplete()
override fun subscribe(it: ObservableEmitter<Int>?) {
it?.onNext(1)
it?.onNext(2)
it?.onNext(3)
it?.onComplete()
it?.onComplete()
it?.onComplete()
}
})
val observer = object: Observer<Int> {
override fun onComplete() {
debugMsg("onComplete")
}
override fun onSubscribe(d: Disposable?) {
debugMsg("onSubscribe")
}
override fun onNext(value: Int?) {
debugMsg("==",value)
}
override fun onError(e: Throwable?) {
debugMsg("onError")
}
}
observable.subscribe(observer)
日志输出
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onSubscribe
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 1
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 2
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 3
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onComplete
可以看到我们的onComplete只输出了一次
2: 上游只要发送了onComplete, 下游不会再接收后续的消息
override fun subscribe(it: ObservableEmitter<Int>?) {
it?.onNext(1)
it?.onNext(2)
it?.onNext(3)
it?.onComplete()
it?.onNext(4)
}
})
val observer = object: Observer<Int> {
override fun onComplete() {
debugMsg("onComplete")
}
override fun onSubscribe(d: Disposable?) {
debugMsg("onSubscribe")
}
override fun onNext(value: Int?) {
debugMsg("==",value)
}
override fun onError(e: Throwable?) {
debugMsg("onError")
}
}
observable.subscribe(observer)
日志输出
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onSubscribe
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 1
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 2
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 3
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onComplete
3: 上游发送的onComplete和onError必须是互斥 以及发送多个onError报错
val observable = Observable.create(object :ObservableOnSubscribe<Int> {
override fun subscribe(it: ObservableEmitter<Int>?) {
it?.onNext(1)
it?.onNext(2)
it?.onNext(3)
it?.onComplete()
it?.onError(Throwable())
}
})
val observer = object: Observer<Int> {
override fun onComplete() {
debugMsg("onComplete")
}
override fun onSubscribe(d: Disposable?) {
debugMsg("onSubscribe")
}
override fun onNext(value: Int?) {
debugMsg("==",value)
}
override fun onError(e: Throwable?) {
debugMsg("onError")
}
}
observable.subscribe(observer)
日志输出
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onSubscribe
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 1
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 2
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 3
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onComplete
W/System.err: at com.eegets.rxjava.MainActivity$loadRxjava1$observable$1.subscribe(MainActivity.kt:76)
W/System.err: at com.eegets.rxjava.MainActivity.loadRxjava1(MainActivity.kt:97)
W/System.err: at com.eegets.rxjava.MainActivity$onCreate$1.onClick(MainActivity.kt:32)
如上log日志输出,程序崩溃
在最开始我们说过流式 API, 我们可以改改如上代码,让它看起来更符合RxJava的定义方式
Observable.create(ObservableOnSubscribe<Int> {
it?.onNext(1)
it?.onNext(2)
it?.onNext(3)
it?.onComplete()
}).subscribe(object: Observer<Int> {
override fun onComplete() {
debugMsg("onComplete")
}
override fun onSubscribe(d: Disposable?) {
debugMsg("onSubscribe")
}
override fun onNext(value: Int?) {
debugMsg("==",value)
}
override fun onError(e: Throwable?) {
debugMsg("onError")
}
})
本节咱们看了如何用RxJava发送简单的消息,后续咱们还有看很多干货, 下一章 Consumer
走起
网友评论