Rxbus2实现详解

作者: 皮球二二 | 来源:发表于2017-06-26 14:50 被阅读262次

    熟悉过Rxbus1的朋友应该都知道,Rxbus1利用Rxjava中的PublishSubject的特性(只会把在订阅发生的时间点之后来自原始Observable的数据发射给订阅者)来完成事件的通知和订阅。同样Rxjava2也同样可以这样来实现,但是如果就这么换个库就结束了,本文也失去了存在的意义了是吧?

    代码已经上传到github

    准备工作

    1. Rxjava的新特性
      先来看看这三行代码
    val subject: Subject<Any> = PublishSubject.create<Any>().toSerialized()
    val publishProcessor: FlowableProcessor<Any> = PublishProcessor.create<Any>().toSerialized()
    val relay: Relay<Any> = PublishRelay.create<Any>().toSerialized()
    

    第一行就是之前所说的Rxjava1直接升级到Rxjava2,这个没什么说法
    第二行就是Rxjava2背压处理新特性。原来的io.reactivex.Observable不支持背压处理,如果有大量消息堆积在总线中来不及处理就会产生MissingBackpressureException或者OutOfMemoryError,因此新类io.reactivex.Flowable用来专门处理背压问题。
    第三行是JakeWharton的杰作RxRelay,它解决了在订阅者处理事件出现异常后,订阅者无法再收到事件这种情况。这个不是Rxjava的错误,而是Rxjava的设计原则就是如此。
    所以我们使用RxRelay来实现有异常处理能力的Rxbus。

    1. EventBus实现原理
      你可以直接阅读这篇文章Android EventBus源码解析 带你深入理解EventBus,以便于对整个事件总线流程有个初步印象

    具体实现

    之前我们提到了用RxRelay来处理订阅后发生的事件,那么这个事件是什么?我们先回忆一下EventBus的使用过程:首先在onCreate里面注册事件,随后我们实现订阅者(比如加了Subscribe注解的onEventMainThread方法),事件发生的时候由被订阅者触发(比如EventBus.getDefault().post(bean)),使用完毕之后我们在onDestory里面去解绑事件。OK没问题,我们需要顺着这个思路去实现我们的代码

    1. 自定义异常声明类RxbusException
    class RxbusException : RuntimeException {
          constructor(message: String) : super(message)
    }
    
    1. 定义一个线程处理枚举类ThreadMode。这个模仿了EventBus中的@Subscribe注解中的threadMode,用来设置订阅者所在的线程
    enum class ThreadMode {
          SINGLE, COMPUTATION, IO, TRAMPOLINE, NEW_THREAD, MAIN
    }
    

    枚举类的值与Rxjava以及RxAndroid内置了几个Scheduler一致,如有疑问请自行查阅相关内容

    1. 用来注解方法的类Subscribe。这个模仿了EventBus中的@Subscribe注解,该注解作用在方法上,默认值为MAIN
    @MustBeDocumented
    @Target(AnnotationTarget.FUNCTION)
    @Retention(AnnotationRetention.RUNTIME)
    annotation class Subscribe(val threadMode: ThreadMode = ThreadMode.MAIN)
    

    这里注意下kotlin中的注解的写法

    1. 既然注解是加在方法上的,那我们就要通过反射方式去调用这个方法。那这个方法又怎么能被找到呢?那肯定需要一个管理类进行注册、存储、解绑,这就是我们Rxbus的核心部分
      首先这个类肯定是一个单例
    private val mRelay: Relay<Any>
    private val mDisposableMap: MutableMap<Class<*>, MutableMap<Class<*>, Disposable>>
    init {
          mRelay = PublishRelay.create<Any>().toSerialized()
          mDisposableMap = mutableMapOf()
    }
    private object Instance {
          val RXBUS = RxBus()
    }
    companion object {
          val default: RxBus
              get() = Instance.RXBUS
    }
    

    采用静态内部类的形式实现。这里我们初始化了Relay还有一个map,这个map的泛型是<订阅者所在类, <事件对象类型, 事件对应的Disposable>>。这种结构也强调了在订阅者所在类中,某种事件对象类型必须唯一,不能有多个方法处理同一种事件对象类型

    1. 工具类有了,我们就要在工具类里实现各种方法吧。先是注册。
    fun regist(targetClass: Class<*>) {
          for (subscriberMethod in SubscriberMethodFinder.findUsingReflection(targetClass)) {
                addSubscriber(targetClass, subscriberMethod)
    }
    

    上面代码是不完整的。它的含义是找到这个订阅者所在类中所有的Rxbus的反射方法,并将这些方法存储在刚才定义的map中。要符合Rxbus的方法才行,如代码所示,方法必须是public并且方法不能是abstract或者static或者synchronized、只有一个入参的注解方法

    object SubscriberMethodFinder {
    
        val MODIFIERS_IGNORE: Int = Modifier.ABSTRACT.or(Modifier.STATIC).or(Modifier.SYNCHRONIZED)
    
        fun findUsingReflection(class_: Class<*>): List<SubscriberMethod> {
            var lists: MutableList<SubscriberMethod> = ArrayList()
            for (declaredMethod in class_.declaredMethods) {
                val modifiers: Int = declaredMethod.modifiers
                // 方法必须是public并且方法不能是abstract或者static或者synchronized
                if (modifiers.and(Modifier.PUBLIC)!= 0 && modifiers.and(SubscriberMethodFinder.MODIFIERS_IGNORE) == 0) {
                    val parameterTypes: Array<Class<*>> = declaredMethod.parameterTypes
                    // 只有一个入参
                    if (parameterTypes.size==1) {
                        val subscribe = declaredMethod.getAnnotation(Subscribe::class.java)
                        // 判断是否为自定义注解对象
                        if (subscribe!=null) {
                            val threadMode: ThreadMode = subscribe.threadMode
                            val classType: Class<*> = parameterTypes[0]
                            lists.add(SubscriberMethod(getThreadMode(threadMode), classType, declaredMethod))
                        }
                    }
                }
            }
            // 如果没有添加过
            if (lists.size==0) {
                throw RxbusException("Subscriber ${class_::class.java} and its super classes have no public methods with the @Subscribe annotation")
            }
            return lists
        }
    
        /**
         * 通过自身定义线程类型,获取Rxjava中的线程类型
         */
        fun getThreadMode(threadMode: ThreadMode): io.reactivex.Scheduler =
            when(threadMode) {
                ThreadMode.MAIN -> AndroidSchedulers.mainThread()
                ThreadMode.COMPUTATION -> Schedulers.computation()
                ThreadMode.IO -> Schedulers.io()
                ThreadMode.NEW_THREAD -> Schedulers.newThread()
                ThreadMode.SINGLE -> Schedulers.single()
                ThreadMode.TRAMPOLINE -> Schedulers.trampoline()
                else -> AndroidSchedulers.mainThread()
            }
    }
    

    SubscriberMethod对象存储了每一个方法的信息

    data class SubscriberMethod(val threadMode: Scheduler, val eventClass: Class<*>, val method: Method)
    

    找到之后添加进map中

        private fun addSubscriber(targetClass: Class<*>, subscriberMethod: SubscriberMethod) {
            synchronized(RxBus::class.java) {
                val eventClass: Class<*> = subscriberMethod.eventClass
                val disposable: Disposable = mRelay.ofType(eventClass).observeOn(subscriberMethod.threadMode).subscribe({
                    subscriberMethod.method.invoke(targetClass.newInstance(), it)
                })
                var map: MutableMap<Class<*>, Disposable>
                if (mDisposableMap.get(targetClass)==null) {
                    map = mutableMapOf()
                    mDisposableMap.put(targetClass, map)
                    map.put(eventClass, disposable)
                }
                else {
                    map = mDisposableMap.get(targetClass)!!
                    map.put(eventClass, disposable)
                }
            }
        }
    

    事件通过反射去处理

    1. 发射事件
        fun post(obj: Any) {
            synchronized(RxBus::class.java) {
                if (mRelay.hasObservers()) mRelay.accept(obj)
            }
        }
    

    这里你可以想象一下EventBus的post方法

    1. 解绑事件
      解除订阅者所在类的所有类型事件与某种特定类型事件两种方法
        fun unregist(targetClass: Class<*>) {
            synchronized(RxBus::class.java) {
                if (mDisposableMap.get(targetClass) != null) {
                    val map: MutableMap<Class<*>, Disposable> = mDisposableMap.get(targetClass)!!
                    for ((key, value) in map) {
                        value.dispose()
                    }
                    mDisposableMap.remove(targetClass)
                }
                else {
                    throw RxbusException("${targetClass::class.java} haven't registered RxBus")
                }
            }
        }
        fun unregist(targetClass: Class<*>, eventClass: Class<*>) {
            synchronized(RxBus::class.java) {
                if (mDisposableMap.get(targetClass) != null) {
                    val map: MutableMap<Class<*>, Disposable> = mDisposableMap.get(targetClass)!!
                    if (map.containsKey(eventClass) != null) {
                        map.get(eventClass)!!.dispose()
                        map.remove(eventClass)
                    }
                    else {
                        throw RxbusException("The event with type of ${eventClass::class.java} is not" +
                                " required in ${targetClass::class.java}")
                    }
                }
                else {
                    throw RxbusException("${targetClass::class.java} haven't registered RxBus")
                }
            }
        }
    

    使用

    代码就这么多,最后来看看如何使用

    class MainActivity : AppCompatActivity() {
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
    
            RxBus.default.regist(MainActivity::class.java)
    
            btn_rxbus.setOnClickListener {
                RxBus.default.post(ExampleBean("123"))
            }
        }
    
        override fun onDestroy() {
            super.onDestroy()
    
            RxBus.default.unregist(MainActivity::class.java)
        }
    
        @Subscribe(threadMode = ThreadMode.MAIN)
        fun onEventMainThread(exampleBean: ExampleBean) {
            Log.d("MainActivity", "receive ${exampleBean.string}")
        }
    }
    

    我们注册了当前activity,并且设置了ExampleBean这个事件类,现在来发射看看


    使用

    没毛病

    参考文章

    KingJA/RxBus2
    yuli2039/Gank4k
    RxJava2版本的Rxbus

    相关文章

      网友评论

        本文标题:Rxbus2实现详解

        本文链接:https://www.haomeiwen.com/subject/ekpdcxtx.html