美文网首页
基于rxjava2 & kotlin实现的RxBus

基于rxjava2 & kotlin实现的RxBus

作者: RainYue | 来源:发表于2017-12-08 23:05 被阅读0次

    这是一个基于rxjava2实现的事件总线类,可以方便的使用关键字来发送和订阅信息。

    代码

    class RxBus {
        companion object {
            private val _instance: RxBus by lazy { RxBus() }
            val instance get() = _instance
        }
    
        data class RxMsg(val action: String, val event: Any)
    
        private val bus: FlowableProcessor<RxMsg> by lazy { PublishProcessor.create<RxMsg>().toSerialized() }
        private val map = mutableMapOf<Any, MutableMap<String, MutableList<Disposable>>>()
    
        fun <T : Any> post(a: String, o: T) = SerializedSubscriber(bus).onNext(RxMsg(a, o))
    
        fun <T> flowable(clazz: Class<T>,
                         action: String,
                         scheduler: Scheduler = AndroidSchedulers.mainThread()): Flowable<T> {
            return bus.ofType(RxMsg::class.java).filter {
                it.action == action && clazz.isInstance(it.event)
            }.map { clazz.cast(it.event) }.observeOn(scheduler)
        }
    
        inline fun <reified T> flowable(action: String,
                                        scheduler: Scheduler = AndroidSchedulers.mainThread()): Flowable<T> =
                flowable(T::class.java, action, scheduler)
    
        fun <T> subscribe(clazz: Class<T>,
                          target: Any,
                          action: String,
                          scheduler: Scheduler = AndroidSchedulers.mainThread(),
                          call: (T) -> Unit): Disposable =
                flowable(clazz, action, scheduler).subscribe { call(it) }.also { obs ->
                    map.getOrPut(target, { mutableMapOf() }).getOrPut(action, { mutableListOf() }).add(obs)
                }
    
        inline fun <reified T> subscribe(target: Any,
                                         action: String,
                                         scheduler: Scheduler = AndroidSchedulers.mainThread(),
                                         noinline call: (T) -> Unit): Disposable =
                subscribe(T::class.java, target, action, scheduler, call)
    
        fun unsubscribe(target: Any, action: String? = null) {
            map[target]?.let {
                if (action != null) it.remove(action)?.onEach { it.dispose() }
                else it.onEach { it.value.forEach { it.dispose() } }.clear()
                if (it.isEmpty()) map.remove(target)
            }
        }
    }
    

    使用

    • 在指定对象上按关键字订阅, 可以按指定关键字(action)和类型(String)接收数据。
    RxBus.instance.subscribe<String>(this, "action") { println(it) }
    
    • 按关键字取消订阅
    RxBus.instance.unsubscribe(this, "action")
    
    • 取消指定对象上的全部订阅
    RxBus.instance.unsubscribe(this)
    
    • 发送消息,会自动发送指定类型数据给订阅者。
    RxBus.instance.post("action", "msg")
    
    • 也可不依赖于一个对象,这样需要手工调用 dispose() 取消订阅,或者使用RxLifecycle管理订阅。
    val disposable = RxBus.instance.flowable<String>("action").observeOn(AndroidSchedulers.mainThread()).subscribe {
        println(it)
    }
    disposable.dispose()
    

    示例

    override fun onCreate(state: Bundle?) {
        super.onCreate(state)
        setContentView(R.layout.activity_main)
        RxBus.instance.subscribe<String>(this, "click") { 
            Toast.makeText(this, it, Toast.LENGTH_SHORT).show()
        }
        findViewById<Button>(R.id.button).setOnClickListener {
            RxBus.instance.post("click", "test")
        }
    }
    override fun onDestroy() {
        super.onDestroy()
        RxBus.instance.unsubscribe(this, "day_night")
    }
    

    应用 https://github.com/yueeng/meitu

    aitaotu meitulu meituri

    相关文章

      网友评论

          本文标题:基于rxjava2 & kotlin实现的RxBus

          本文链接:https://www.haomeiwen.com/subject/qayyixtx.html