RxBus 是 EventBus 在RxJava 的替代品 ,而在RxJava 中只需要短短几行代码就能实现。RxJava1 与 RxJava2 有些微不同,具体可以参考 What’s different in 2.0 。下面总结用Kotlin 实现的不同场景的 RxBus :
- 没有背压处理(Backpressure)的 Rxbus
- 有背压处理的 RxBus
- 有异常处理的 Rxbus (订阅者处理事件出现异常也能继续收到事件)
没有背压处理(Backpressure)的 Rxbus
在RxJava2里,引入了Flowable这个类来处理Backpressure,而Observable不包含Backpressure 处理。
object RxBus {
private val mBus: Subject<Any> = PublishSubject.create()
fun <T> toObservable(clzz: Class<T>): Observable<T> = mBus.ofType(clzz)
fun toObservable(): Observable<Any> = mBus
fun post(obj: Any) {
mBus.onNext(obj)
}
fun hasObservers(): Boolean {
return mBus.hasObservers()
}
}
但是在调用toObservable(clzz: Class<T>)
的时候,不要简单使用 XXX.class ,因为这样不符合Kotlin的语法,正确的的调用方式是 :
RxBus.toObservable(String::class.java).subscribe( ... )
有背压处理的 RxBus
object RxBus {
private val mBus: FlowableProcessor<Any> = PublishProcessor.create()
fun <T> toFlowable(tClass: Class<T>): Flowable<T> {
return mBus.ofType(tClass)
}
fun toFlowable(): Flowable<Any> {
return mBus
}
fun post(obj: Any) {
mBus.onNext(obj)
}
fun hasSubscribers(): Boolean {
return mBus.hasSubscribers()
}
}
有异常处理的 Rxbus -基于 RxRelay
RxRelay 是既是Observable也是Consumer的RxJava 类型。
object RxBus3 {
private val mBus: PublishRelay<Any> = PublishRelay.create()
fun <T> toObservable(clzz: Class<T>): Observable<T> = mBus.ofType(clzz)
fun toObservable(): Observable<Any> = mBus
fun post(obj: Any) {
mBus.accept(obj)
}
fun hasObservers(): Boolean {
return mBus.hasObservers()
}
}
关于RxRelay与Subject 的区别,网上一般说"RxRelay 即使出现异常也不会终止订阅关系" , 一开始看到这句话,有点蒙,因为根据我的实际验证,订阅者有异常时,这个订阅会自动取消,也不会影响其它订阅者,这个效果 RxRelay 与 Subject 是一样的。而它们真正的区别是
- Subject 是Observable也是Observer,所以它可以发送 onComplete 、onError ,这样,会取消订阅所有的订阅者,订阅有被动取消订阅的意思。
- RxRelay 是Observable也是Consumer , 所以RxRelay 不能发送 onComplete 、onError , 那也就不用担心被中断了。
网友评论