美文网首页Android开发Android开发经验谈Kotlin编程
Kotlin 实现的基于 RxJava 2.× 的 RxBus

Kotlin 实现的基于 RxJava 2.× 的 RxBus

作者: xiaofei_dev | 来源:发表于2017-11-26 11:49 被阅读416次

    实现

    既然你主动打开了这篇文章,那你肯定了解 RxBus 是什么,以及 RxBus 是干什么用的。所以我就偷个懒不陪你复习基础知识了 #滑稽,下面直接贴代码。
    Kotlin 实现的基于 RxJava 2.× 的 RxBus 单例:

    object RxBus {
        // 支持背压且线程安全的,保证线程安全需要调用 toSerialized() 方法
        private val mBus: FlowableProcessor<Any>
                by lazy { PublishProcessor.create<Any>().toSerialized() }
        //发送事件
        fun post(obj: Any) {
            mBus.onNext(obj)
        }
        //订阅事件
        fun <T> toFlowable(tClass: Class<T>) = mBus.ofType(tClass)
    
        fun toFlowable() = mBus
    
        fun hasSubscribers() = mBus.hasSubscribers()
    
    
        //不支持背压且线程安全的,保证线程安全需要调用 toSerialized() 方法
        private val mBusNB: Subject<Any>
                by lazy { PublishSubject.create<Any>().toSerialized() }
        //发送事件
        fun postNB(obj: Any){
            mBusNB.onNext(obj)
        }
        //订阅事件   
        fun <T> toObservable(tClass: Class<T>): Observable<T> = mBusNB.ofType(tClass)
    
        fun toObservable(): Observable<Any> = mBusNB
    
        fun hasObservers() = mBusNB.hasObservers()
    }
    

    OK,其实没多少代码,我们的 RxBus 单例就实现完成了,下面以支持背压的例子讲讲使用方式。

    使用

    比如我现在在某个目标 Activity 里想订阅 String 类型的消息,消息发送简单起见也在当前 Activity 里发送。

    这里我用一个私有方法订阅字符串消息:

    //接收到消息后只是简单 toast 一下(我是上面所说的私有方法)
    private fun subscribeString() = RxBus.toFlowable(String::class.java)
                                .subscribeOn(Schedulers.io())
                                .observeOn(AndroidSchedulers.mainThread())
                                .subscribe { msg -> toast(msg) }
    
    //具体订阅方式要这样来,我们先在当前 Activity 里维护一个 CompositeDisposable 实例,用于订阅消息和取消订阅关系,避免内存泄漏:
    val mCompositeDisposable: CompositeDisposable = CompositeDisposable()
    
    //再然后是建立具体的订阅关系,比如我在 onCreate 方法里建立具体订阅关系:
    override fun onCreate() {
            super.onCreate()
            ...
            ...
            mCompositeDisposable.add(subscribeString())
        }
    
    
    //在当前 Activity 的 onDestroy 方法里取消订阅关系:
    override fun onDestroy() {
            super.onDestroy()
            mCompositeDisposable.clear()
        }
    
    

    OK,现在我们已经用 RxBus 订阅了字符串消息,下面我们在当前 Activity 的某个地方发送字符串消息:

    //这行代码被调用的时候会弹出 HelloWorld! 的 Toast
    RxBus.post("HelloWorld!")
    

    这样,一个完整的字符串消息订阅关系就建立了。

    完。

    相关文章

      网友评论

        本文标题:Kotlin 实现的基于 RxJava 2.× 的 RxBus

        本文链接:https://www.haomeiwen.com/subject/brrkbxtx.html