美文网首页知识点源码分析Android深入
手撸一个Android经典线程通信框架:Handler

手撸一个Android经典线程通信框架:Handler

作者: Hiworl | 来源:发表于2022-04-03 23:47 被阅读0次

    前言


    2022年已过1/4,时间过的真是快。近些年大Android的发展也很是迅速,尤其是遵循MVVM或者MVI架构下,使用Jetpack + Kotlin + Corroutine + Flow的组合,大大提升了Android应用的开发效率。然而,类似的效率的提升往往是通过层层封装,隐藏底层原理,简化调用,从而达到降低开发的上手门槛目的的。作为一个有品位的开发者,又怎能满足于只了解上层的API调用。本文我们就来通过实例来试着聊一聊Android经典线程通信框架Handler的基本原理。

    目的


    通过实现一个简单的Handler框架,试图解释其底层的工作原理。

    背景知识


    • 线程间通信
      即不同线程之间交换信息,Java中常见的有:

      1. Object
        wait(), notify()/notifyAll()
      2. LockSupport
        park(), uppark()
      3. Condition
        await(), signal()/signalAll()
    • Android的Handler工作原理
      简言之,重复执行“发送消息-分发消息-处理消息”。
      更准确的问题描述应该是:Android的基于Handler + Message + MessageQueue + Looper的消息机制是怎么工作的?

    Handler借助handler#sendMessage(message)方法,把消息Message对象插入消息队列MessageQueue对象中,同时Looper对象循环往复地从MessageQueue对象中取下一个消息Message对象分发给Looper对象所在线程去处理,如此循环往复。

    注意: 同一个线程中,Looper和MessageQueue有且只有一个实例,能且只能在该线程内部实例化。Handler可有多个实例,可在任意线程内实例化,但前需保证实例化动作所在线程的Looper已被初始化。另外,也可以通过调用带Looper参数的Handler构造器实例化,以达到关联指定线程(Looper所在)的目的。

    从上述描述中,我们可以得出几个关键的点:
    消息对象:保存消息的数据结构(会保存发送者的Handler对象引用)
    发送消息:调用者通过Handler#sendMessage(message)发送消息(发到哪里?缓存MessageQueue
    缓存消息MessageQueue保存由外部发送来的消息(怎么保存?类似单向链表结构)
    读取消息Looper#loop()内调用MessageQueue.next()读取消息(没消息了怎么办?阻塞)
    分发消息:取到消息后,执行消息对象持有的Handler对象的Handler#dispatchMessage(message)方法。(运行在哪个线程?Looper对象所在的线程)
    处理消息Handler#dispatchMessage(message)内再调用Handler#handleMessage(message)方法,后者便正是调用者常重写的方法。
    循环往复Looper#loop()内部是一个for(;;){...},一直执行,提供循环往复的驱动力。
    线程内Looper对象唯一性:依靠ThreadLocal保证每个线程内部Looper唯一,即只被实例化一次。
    线程内MessageQueue对象唯一性:被Looper实例化并持有,间接保证了唯一性。

    • Android主线程Handler使用

    应用启动时,ActivityThread.main(String[])内部会提前初始化Looper。故在主线程内部,可直接使用Handler的无参数构造器实例化。

    • Android子线程Handler使用

    如上所述,子线程使用Handler需要先确保Looper已被初始化,但子线程默认是没有初始化Looper的,故需在子线程执行的一开始主动调用Looper.prepare()Looper.loop()。此后则可如在主线程一般任意实例化Handler

    • Android的HandlerThread原理

    即封装了HandlerThread子类。HandlerThread#start()被调用后,该线程内部会初始化Looper实例。其他线程可从该HandlerThread对象中取出Looper实例,并用它构造出新的Handler对象。此后其他线程可借助该Handler对象,调用Handler#sendMessage(message)给HandlerThread的线程发送消息。

    实现

    思路

    在了解了上述的Android的Handler工作原理后,自己实现一个简单的消息框架就有方向了,无非就是解决上述中提到的“消息对象”,“发送消息”,“缓存消息”……关键点即可。

    线程内Looper对象唯一性线程内MessageQueue对象唯一性读取消息是需要着重考虑的。

    编码

    Message

    package com.custom.handler
    
    data class Message(
        val what: Int = 0,
        val args: Any? = null,
        val block: (() -> Unit)? = null,
    ) {
        companion object {
            val HEAD_MESSAGE: Message = Message()
        }
    
        var target: Handler? = null
        var next: Message? = null
    }
    

    MessageQueue

    package com.custom.handler
    
    import java.util.concurrent.locks.ReentrantLock
    import java.util.concurrent.locks.ReentrantReadWriteLock
    import kotlin.concurrent.withLock
    
    class MessageQueue {
        private val queueLock = ReentrantLock()
        private val queueCondition = queueLock.newCondition()
    
        private val messageLock = ReentrantReadWriteLock()
        private val messageReadLock = messageLock.readLock()
        private val messageWriteLock = messageLock.writeLock()
    
        private var currentMessage: Message = Message.HEAD_MESSAGE
            get() = messageReadLock.withLock { field }
            set(value) = messageWriteLock.withLock { field = value }
    
        fun next(): Message {
            while (true) {
                queueLock.withLock {
                    if (currentMessage.next == null) {
                        println("[${Thread.currentThread().name}] Message queue is empty, then it is going to await...")
                        queueCondition.await()
                        println("[${Thread.currentThread().name}] Message queue wakes up.")
                    }
                }
    
                val nextMessage = currentMessage.next!!
                nextMessage.next = null
                currentMessage = nextMessage
                return nextMessage
            }
        }
    
        fun enqueue(msg: Message) {
            messageWriteLock.withLock {
                msg.next = null
                currentMessage.next = msg
                queueLock.withLock { queueCondition.signalAll() }
            }
        }
    }
    

    Looper

    package com.custom.handler
    
    import java.util.concurrent.locks.ReentrantReadWriteLock
    import kotlin.concurrent.withLock
    
    class Looper private constructor(thread: Thread) {
        internal val mMessageQueue = MessageQueue()
        private val mThread = thread
        private val quitFlagReentrantReadWriteLock = ReentrantReadWriteLock()
        private val quitFlagReadLock = quitFlagReentrantReadWriteLock.readLock()
        private val quitFlagWriteLock = quitFlagReentrantReadWriteLock.writeLock()
        private var quitFlag: Boolean? = null
            get() = quitFlagReadLock.withLock { field }
            set(value) = quitFlagWriteLock.withLock { field = value }
    
        companion object {
            private val looperMapReentrantReadWriteLock = ReentrantReadWriteLock()
            private val looperMapReadLock = looperMapReentrantReadWriteLock.readLock()
            private val looperMapWriteLock = looperMapReentrantReadWriteLock.writeLock()
            private val looperMap: MutableMap<Thread, Looper> = mutableMapOf()
    
            val looper: Looper?
                get() = looperMapReadLock.withLock { looperMap[Thread.currentThread()] }
    
            fun prepare(): Looper {
                require(looper == null) { "Looper.prepare() must be called once in each thread." }
                val thread = Thread.currentThread()
                val threadLooper = Looper(thread)
                looperMapWriteLock.withLock { looperMap[thread] = threadLooper }
                return threadLooper
            }
    
            fun loop() {
                requireNotNull(looper) { "Looper.prepare() must be called before Looper.loop()" }
                looper?.quitFlag = false
                while (looper?.quitFlag == false) {
                    val me = looper!!.mMessageQueue
                    val msg = me.next()
                    kotlin.runCatching {
                        println("[${Thread.currentThread().name}] Dispatching message is started.")
                        msg.target?.dispatchMessage(msg)
                        println("[${Thread.currentThread().name}] Dispatching message is stopped. ")
                    }.onFailure {
                        it.printStackTrace()
                    }
                }
            }
        }
    
        fun quit() {
            quitFlag = true
            looperMap.remove(Thread.currentThread())
        }
    }
    

    Handler

    package com.custom.handler
    
    open class Handler {
        private var interceptor: Interceptor? = null
        private lateinit var mLooper: Looper
    
        constructor() {
            val looper = requireNotNull(Looper.looper) { "No looper found in current thread." }
            Handler(looper)
        }
    
        constructor(looper: Looper) {
            mLooper = looper
        }
    
        interface Interceptor {
            fun handleMessage(message: Message?): Boolean
        }
    
        open fun dispatchMessage(message: Message?) {
            println("[${Thread.currentThread().name}] onDispatchMessage: message=$message")
            message ?: return
            if (message.block == null) {
                if (interceptor?.handleMessage(message) != true){
                    handleMessage(message)
                }
                return
            }
            message.block.invoke()
        }
    
        open fun handleMessage(message: Message?) {
            println("[${Thread.currentThread().name}] onReceiveMessage: message=$message")
        }
    
        fun sendMessage(message: Message?) {
            println("[${Thread.currentThread().name}] onSendMessage: message=$message")
            message?.target = this
            mLooper.mMessageQueue.enqueue(message ?: return)
        }
    
        fun quit(){
            mLooper.quit()
        }
    }
    

    [扩展] HandlerThread

    import com.custom.handler.Looper
    import java.util.concurrent.locks.ReentrantLock
    import kotlin.concurrent.withLock
    
    class HandlerThread(name:String):Thread(name) {
        private val lock = ReentrantLock()
        private val condition = lock.newCondition()
        private var mLooper: Looper? = null
        val looper: Looper
            get() {
                if (mLooper == null) {
                    lock.withLock { condition.await() }
                }
                return requireNotNull(mLooper)
            }
    
        override fun run() {
            super.run()
            mLooper = Looper.prepare()
            lock.withLock { condition.signalAll() }
            Looper.loop()
        }
    }
    

    [测试] Main
    由于HandlerThread已经处理了子线程初始化Looper的操作,因此,使用该类直接测试即可。

    import com.custom.handler.Handler
    import com.custom.handler.Message
    
    fun main(args: Array<String>) {
        testHandler()
    }
    
    private fun testHandler(){
        val handlerThread = HandlerThread("Thread-1").apply { start() }
        val handler = object : Handler(handlerThread.looper) {
            override fun handleMessage(message: Message?) {
                super.handleMessage(message)
            }
        }
        for (i in 0..3){
            if (i == 3) {
                handler.quit()
                return
            }
            Thread.sleep(3000)
            println("\n")
            handler.sendMessage(Message(100, "message from Main-Thread"))
        }
    }
    
    // console output
    [Thread-1] Message queue is empty, then it is going to await...
    
    [main] onSendMessage: message=Message(what=100, args=message from Main-Thread, block=null)
    [Thread-1] Message queue wakes up.
    [Thread-1] Dispatching message is started.
    [Thread-1] onDispatchMessage: message=Message(what=100, args=message from Main-Thread, block=null)
    [Thread-1] onReceiveMessage: message=Message(what=100, args=message from Main-Thread, block=null)
    [Thread-1] Dispatching message is stopped. 
    [Thread-1] Message queue is empty, then it is going to await...
    
    [main] onSendMessage: message=Message(what=100, args=message from Main-Thread, block=null)
    [Thread-1] Message queue wakes up.
    [Thread-1] Dispatching message is started.
    [Thread-1] onDispatchMessage: message=Message(what=100, args=message from Main-Thread, block=null)
    [Thread-1] onReceiveMessage: message=Message(what=100, args=message from Main-Thread, block=null)
    [Thread-1] Dispatching message is stopped. 
    [Thread-1] Message queue is empty, then it is going to await...
    
    [main] onSendMessage: message=Message(what=100, args=message from Main-Thread, block=null)
    [Thread-1] Message queue wakes up.
    [Thread-1] Dispatching message is started.
    [Thread-1] onDispatchMessage: message=Message(what=100, args=message from Main-Thread, block=null)
    [Thread-1] onReceiveMessage: message=Message(what=100, args=message from Main-Thread, block=null)
    [Thread-1] Dispatching message is stopped. 
    
    Process finished with exit code 0
    

    Q&A


    1. Handler是怎么实现线程切换的?
      就个人理解而言,所谓“线程切换”,即就是实现线程间的通信即可。此处,我们姑且把“线程”理解为一个无限循环。外界的消息/操作要想插入到这个“线程”,可想象为把消息/操作插入到无限循环中,如此以来便完成了“线程切换”。
      消息插入无限循环:即为存在竞争关系的变量的同步问题。线程外部修改变量,线程内部读取变量。可通过背景知识中提到的各种线程间通信的方法来实现。
      操作插入无限循环:即为注册回调问题。

    发送消息时,调用Handler#sendMessage(message),该方法运行在发生调用动作所在的线程(简称,调用线程);处理消息时,会先后调用Handler#dispatchMessage(messge)Handler#handleMessage(message),此两方法运行在Looper.loop()的无限循环中,即Looper的实例对象所属的线程(简称,处理线程)。而发送消息的本质是:调用线程向处理线程的Looper内的消息队列MessageQueue中插入一个消息对象;处理消息的本质是:处理线程取出消息再调用消息实例对象持有的Handler实例的Handler#handleMessage(message),参数即为当前取出的消息实例对象

    1. 无消息时,MessageQueue#next()如何被阻塞?
      方法内部会先调用android.os.MessageQueue#nativePollOnce(ptr, timeoutMillis)方法。若当前没有消息,则nativePollOnce(ptr, timeoutMillis)会从native层使用epollepoll_wait机制阻塞当前线程,至此MessageQueue.next()也就被阻塞。
    Message next () {
        ...
        for (;;) {
            ...
            android.os.MessageQueue#nativePollOnce
            ...
            return nextMsg
        }
    }
    
    1. 有新消息时,MessageQueue#next()如何被唤醒?
      当消息队列被插入新消息时,MessageQueue#enqueueMessage(msg, when)会被调用,消息插入完成后,其内部android.os.MessageQueue#nativeWake(ptr)会被调用,此时原本被android.os.MessageQueue#nativePollOnce(ptr, timeoutMillis)阻塞的线程就会唤醒,并开始执行取消息和处理消息的流程。
    boolean enqueueMessage (message, when) {
        ...
        //insert message
        ...
        nativeWake(mPtr)
    }
    

    写在最后


    文中提到的ThreadLocal,简言之,就是JDK提供的一种存储“线程内部”变量的手段。Android的基于Handler的消息机制,也正是借助这一点来保存各个线程内部Looper的。当然,若在不考虑性能优化等各种因素条件下,不使用ThreadLocal也是可以实现类似功能的,如本文所实现的消息框架则是使用了Map的数据结构来实现的。 当然,本文的简单实现也旨在解释清楚Android消息机制的原理。

    附:


    源码地址: https://github.com/heychinje/Simple-Handler
    关于Android中的epoll:https://stackoverflow.com/questions/38818642/android-what-is-message-queue-native-poll-once-in-android

    相关文章

      网友评论

        本文标题:手撸一个Android经典线程通信框架:Handler

        本文链接:https://www.haomeiwen.com/subject/agafsrtx.html