美文网首页KotlinRxJavaKotlin
基于Kotlin实现的三种 RxBus

基于Kotlin实现的三种 RxBus

作者: 兰兰笑笑生 | 来源:发表于2017-06-20 10:57 被阅读338次

RxBus 是 EventBus 在RxJava 的替代品 ,而在RxJava 中只需要短短几行代码就能实现。RxJava1 与 RxJava2 有些微不同,具体可以参考 What’s different in 2.0 。下面总结用Kotlin 实现的不同场景的 RxBus :

  • 没有背压处理(Backpressure)的 Rxbus
  • 有背压处理的 RxBus
  • 有异常处理的 Rxbus (订阅者处理事件出现异常也能继续收到事件)

没有背压处理(Backpressure)的 Rxbus

在RxJava2里,引入了Flowable这个类来处理Backpressure,而Observable不包含Backpressure 处理。

object RxBus {
    private val mBus: Subject<Any> = PublishSubject.create()

    fun <T> toObservable(clzz: Class<T>): Observable<T> = mBus.ofType(clzz)

    fun toObservable(): Observable<Any> = mBus

    fun post(obj: Any) {
        mBus.onNext(obj)
    }

    fun hasObservers(): Boolean {
        return mBus.hasObservers()
    }
}

但是在调用toObservable(clzz: Class<T>)的时候,不要简单使用 XXX.class ,因为这样不符合Kotlin的语法,正确的的调用方式是 :

RxBus.toObservable(String::class.java).subscribe( ... )

有背压处理的 RxBus

object RxBus {
    private val mBus: FlowableProcessor<Any> = PublishProcessor.create()

    fun <T> toFlowable(tClass: Class<T>): Flowable<T> {
        return mBus.ofType(tClass)
    }

    fun toFlowable(): Flowable<Any> {
        return mBus
    }

    fun post(obj: Any) {
        mBus.onNext(obj)
    }

    fun hasSubscribers(): Boolean {
        return mBus.hasSubscribers()
    }
}

有异常处理的 Rxbus -基于 RxRelay

RxRelay 是既是Observable也是Consumer的RxJava 类型。

object RxBus3 {
    private val mBus: PublishRelay<Any> = PublishRelay.create()

    fun <T> toObservable(clzz: Class<T>): Observable<T> = mBus.ofType(clzz)

    fun toObservable(): Observable<Any> = mBus

    fun post(obj: Any) {
        mBus.accept(obj)
    }

    fun hasObservers(): Boolean {
        return mBus.hasObservers()
    }
}

关于RxRelay与Subject 的区别,网上一般说"RxRelay 即使出现异常也不会终止订阅关系" , 一开始看到这句话,有点蒙,因为根据我的实际验证,订阅者有异常时,这个订阅会自动取消,也不会影响其它订阅者,这个效果 RxRelay 与 Subject 是一样的。而它们真正的区别是

  • Subject 是Observable也是Observer,所以它可以发送 onComplete 、onError ,这样,会取消订阅所有的订阅者,订阅有被动取消订阅的意思。
  • RxRelay 是Observable也是Consumer , 所以RxRelay 不能发送 onComplete 、onError , 那也就不用担心被中断了。

参考

相关文章

网友评论

    本文标题:基于Kotlin实现的三种 RxBus

    本文链接:https://www.haomeiwen.com/subject/antnqxtx.html