实现
既然你主动打开了这篇文章,那你肯定了解 RxBus 是什么,以及 RxBus 是干什么用的。所以我就偷个懒不陪你复习基础知识了 #滑稽,下面直接贴代码。
Kotlin 实现的基于 RxJava 2.× 的 RxBus 单例:
object RxBus {
// 支持背压且线程安全的,保证线程安全需要调用 toSerialized() 方法
private val mBus: FlowableProcessor<Any>
by lazy { PublishProcessor.create<Any>().toSerialized() }
//发送事件
fun post(obj: Any) {
mBus.onNext(obj)
}
//订阅事件
fun <T> toFlowable(tClass: Class<T>) = mBus.ofType(tClass)
fun toFlowable() = mBus
fun hasSubscribers() = mBus.hasSubscribers()
//不支持背压且线程安全的,保证线程安全需要调用 toSerialized() 方法
private val mBusNB: Subject<Any>
by lazy { PublishSubject.create<Any>().toSerialized() }
//发送事件
fun postNB(obj: Any){
mBusNB.onNext(obj)
}
//订阅事件
fun <T> toObservable(tClass: Class<T>): Observable<T> = mBusNB.ofType(tClass)
fun toObservable(): Observable<Any> = mBusNB
fun hasObservers() = mBusNB.hasObservers()
}
OK,其实没多少代码,我们的 RxBus 单例就实现完成了,下面以支持背压的例子讲讲使用方式。
使用
比如我现在在某个目标 Activity 里想订阅 String 类型的消息,消息发送简单起见也在当前 Activity 里发送。
这里我用一个私有方法订阅字符串消息:
//接收到消息后只是简单 toast 一下(我是上面所说的私有方法)
private fun subscribeString() = RxBus.toFlowable(String::class.java)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { msg -> toast(msg) }
//具体订阅方式要这样来,我们先在当前 Activity 里维护一个 CompositeDisposable 实例,用于订阅消息和取消订阅关系,避免内存泄漏:
val mCompositeDisposable: CompositeDisposable = CompositeDisposable()
//再然后是建立具体的订阅关系,比如我在 onCreate 方法里建立具体订阅关系:
override fun onCreate() {
super.onCreate()
...
...
mCompositeDisposable.add(subscribeString())
}
//在当前 Activity 的 onDestroy 方法里取消订阅关系:
override fun onDestroy() {
super.onDestroy()
mCompositeDisposable.clear()
}
OK,现在我们已经用 RxBus 订阅了字符串消息,下面我们在当前 Activity 的某个地方发送字符串消息:
//这行代码被调用的时候会弹出 HelloWorld! 的 Toast
RxBus.post("HelloWorld!")
这样,一个完整的字符串消息订阅关系就建立了。
完。
网友评论