一、新增接收线程枚举类
enum class ThreadMode {
MAIN,
ASYNC
}
在 @Subscribe
注解中新增此变量:
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.FUNCTION)
annotation class Subscribe(val threadMode: ThreadMode = ThreadMode.MAIN)
二、新增 SubcriberMethod 类
在上一篇文章中,观察者是一个Pair<Any, Method>
列表,因为我们只需要保存对象实例和方法用于反射调用即可。但现在观察者多了一个参数,我们无法再使用 Pair 保存此观察者。所以我们为其创建一个对象:
data class SubscriberMethod(val obj: Any, val method: Method, val threadMode: ThreadMode)
KEventBus 中,用 SubscriberMethod 类替换 Pair:
object KEventBus {
private val subscriptionsByEventType = mutableMapOf<Class<*>, MutableList<SubscriberMethod>>()
fun register(obj: Any) {
// Reflect to get all subscribed methods
obj.javaClass.declaredMethods.forEach {
if (it.isAnnotationPresent(Subscribe::class.java)) {
if (it.parameterTypes.size == 1) {
val eventType = it.parameterTypes.first()
if (eventType !in subscriptionsByEventType) {
subscriptionsByEventType[eventType] = mutableListOf()
}
subscriptionsByEventType[eventType]!!.add(SubscriberMethod(obj, it, it.getAnnotation(Subscribe::class.java)!!.threadMode))
}
}
}
}
fun unregister(obj: Any) {
obj.javaClass.declaredMethods.forEach {
if (it.isAnnotationPresent(Subscribe::class.java)) {
if (it.parameterTypes.size == 1) {
val event = it.parameterTypes.first()
if (event in subscriptionsByEventType) {
subscriptionsByEventType[event]?.remove(SubscriberMethod(obj, it, it.getAnnotation(Subscribe::class.java)!!.threadMode))
}
}
}
}
}
fun post(event: Any) {
subscriptionsByEventType[event.javaClass]?.forEach {
it.method(it.obj, event)
}
}
}
接下来我们就开始为 KEventBus 添加线程切换逻辑。
三、主线程消息
想要在主线程接收消息,我们可以通过 Handler(Looper.getMainLooper())
完成。
创建 MainPoster:
class MainPoster(private val eventBus: KEventBus) : Handler(Looper.getMainLooper()) {
private val queue = mutableListOf<Pair<SubscriberMethod, Any>>()
fun post(subscriberMethod: SubscriberMethod, event: Any) {
queue.add(subscriberMethod to event)
sendEmptyMessage(0)
}
override fun handleMessage(msg: Message) {
super.handleMessage(msg)
val pair = queue.removeFirst()
eventBus.invokeSubscriber(pair.first, pair.second)
}
}
需要注意的是,每一条消息都需要和 SubscriberMethod 绑定,因为 post 消息时,会为每一个观察者函数 post 一条消息,必须确保接收者只有一个,避免重复接收消息。
四、子线程消息
在子线程发送消息,可以通过线程池完成。
创建 AsyncPoster:
class AsyncPoster(private val eventBus: KEventBus) : Runnable {
private val queue = mutableListOf<Pair<SubscriberMethod, Any>>()
private val executors = Executors.newCachedThreadPool()
fun post(subscriberMethod: SubscriberMethod, event: Any) {
queue.add(subscriberMethod to event)
executors.execute(this)
}
override fun run() {
val pair = queue.removeFirst()
eventBus.invokeSubscriber(pair.first, pair.second)
}
}
五、添加线程切换逻辑
在 KEventBus 中添加线程切换逻辑:
object KEventBus {
//...
private val mainPoster = MainPoster(this)
private val asyncPoster= AsyncPoster(this)
fun post(event: Any) {
subscriptionsByEventType[event.javaClass]?.forEach {
when (it.threadMode) {
ThreadMode.MAIN -> {
mainPoster.post(it, event)
}
ThreadMode.ASYNC -> {
asyncPoster.post(it, event)
}
}
}
}
fun invokeSubscriber(subscriberMethod: SubscriberMethod, event: Any) {
subscriberMethod.method(subscriberMethod.obj, event)
}
}
六、测试
@Subscribe(threadMode = ThreadMode.ASYNC)
fun onAsyncEvent(message: String) {
Log.d("~~~", "$message ${Thread.currentThread().name}")
}
将 threadMode 设置为 ASYNC,收到消息时,我们会在 Log 控制台中看到当前线程属于子线程。
例如:
~~~: It's an async message. pool-1-thread-1
这样我们就实现了 EventBus 线程切换的逻辑。实际上,EventBus 中一共有五种 ThreadMode:
- POSTING:与消息发送时所在的线程相同(ThreadMode 的默认值)
- MAIN:主线程。如果发消息时,已经在主线程,则直接发送消息,否则使用主线程的 Handler 发送消息。
- MAIN_ORDERED:主线程且保证有序。使用主线程的 Handler 发送消息。
- BACKGROUND:子线程。如果发消息时,已经在子线程,则直接发送消息,否则使用线程池发送消息。
- ASYNC:子线程。使用线程池发送消息。
网友评论