美文网首页
Handler消息机制

Handler消息机制

作者: hewenyu | 来源:发表于2020-09-06 23:02 被阅读0次
    // 涉及的Java层代码
    frameworks\base\core\java\android\os\
            - Handler.java
            - Looper.java
            - Message.java
            - MessageQueue.java
            
    // 涉及的 Native 层代码
    frameworks\base\core\jni\android_os_MessageQueue.cpp
    system\core\libutils\Looper.cpp
    
    

    1.消息队列的创建

    1.1 Looper 的创建

    frameworks\base\core\java\android\os\Looper.java

    public final class Looper {
        ...
        // 主线程的 Looper 对象
        private static Looper sMainLooper;  // guarded by Looper.class
        // 消息队列
        final MessageQueue mQueue;
        
        public static void prepare() {
            prepare(true);
        }
    
        // quitAllowed 表示消息队列是否可以退出
        private static void prepare(boolean quitAllowed) {
            // sThreadLocal 可以理解为线程局部变量,用来保存该线程关联的 Looper 对象
            // 一个线程多次调用 prepare() 方法会抛异常
            if (sThreadLocal.get() != null) {
                throw new RuntimeException("Only one Looper may be created per thread");
            }
            // 创建 Looper 对象
            sThreadLocal.set(new Looper(quitAllowed));
        }
    
        // prepareMainLooper 函数创建的是主线程的 Looper 对象,在 ActivityThread 的 main 函数中调用
        public static void prepareMainLooper() {
            // 调用 prepare函数,入参是 false,表示主线程的消息队列不允许退出
            prepare(false);
            synchronized (Looper.class) {
                if (sMainLooper != null) {
                    throw new IllegalStateException("The main Looper has already been prepared.");
                }
                // 保存主线程的 Looper 对象,方便其它线程获取
                sMainLooper = myLooper();
            }
        }
    
        // 构造函数私有化,不能被外部创建
        private Looper(boolean quitAllowed) {
            // 创建一个消息队列
            mQueue = new MessageQueue(quitAllowed);
            mThread = Thread.currentThread();
        }
    
        ...
    }
    

    可以看到 Looper 类主要做了如下事情

    • 私有化构造函数,同时提供了 preapare()prepareMainLooper() 两个静态函数通过 ThreadLocal 线程局部变量,来保证每个线程只能持有一个 Looper 对象;
    • prepareMainLooper() 函数在主线程的 ActivityThread->main() 中调用,然后将主线程的 Looper 对象保存在 sMainLooper 中;
    • Looper 对象在创建的时候会创建一个 MessageQueue 对象并保存在 mQueue 对象中,同时获取了当前的线程保存在 mThread 中;

    1.2 MessageQueue 消息队列的创建

    frameworks\base\core\java\android\os\MessageQueue.java

    public final class MessageQueue {
        ...
     
        private native static long nativeInit();
     
        MessageQueue(boolean quitAllowed) {
            // 记录消息队列是否允许退出
            mQuitAllowed = quitAllowed;
            // 调用 native 层的 init 函数并返回一个指针地址
            mPtr = nativeInit();
        }
     
        ...
    }
    

    Java 层中的消息队列 MessageQueue 在初始化的时候比较简,主要记录了两个字段,一个是消息队列是否允许退出,第二个则是调用了native层的 init 函数获取了一个指针地址,该函数位于 android_os_MessageQueue.cpp 文件内

    frameworks\base\core\jni\android_os_MessageQueue.cpp

    
    static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
        // 创建Navice层的 NativeMessageQueue 对象
        NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
        if (!nativeMessageQueue) {
            jniThrowRuntimeException(env, "Unable to allocate native queue");
            return 0;
        }
    
        nativeMessageQueue->incStrong(env);
        return reinterpret_cast<jlong>(nativeMessageQueue);
    }
    
    NativeMessageQueue::NativeMessageQueue() :
            mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
        // 获取 Native 层的 Looper 对象
        // 通过 pthread_getspecific() 获取 Native 层的 Looper 对象,类似 Java 层的 mThreadLocal.get()
        mLooper = Looper::getForThread();
        if (mLooper == NULL) {  
            // 创建Native层的 Looper 对象
            mLooper = new Looper(false);
            // 通过 pthread_setspecific() 保存 Native 层的 Looper 对象,类似 Java 层的 mThreadLocal.set()
            Looper::setForThread(mLooper);
        }
    }
    
    

    Native 层的 init 函数的主要功能为

    • 创建 Native 层的消息队列 NativeMessageQueue
      • 调用 Looper::getForThread() (类似java层的 sThreadLocal.get()) 尝试获取 Native 层的 Looper 对象;
      • Looper::getForThread() 返回的 Looper == null 则创建 Looper对象并调用 Looper::setForThread() (类似java层的 sThreadLocal.set())将 Looper 对象保存到线程的局部变量中;
    • 增加强引用计数器,返回 NativeMessageQueue队列的指针;

    system\core\libutils\Looper.cpp

    
    // Native 层 Looper 对象构造函数
    Looper::Looper(bool allowNonCallbacks) :
            mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
            mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
            mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
        // 创建一个用来通知事件的文件描述符 mWakeEventFd
        // EFD_NONBLOCK 表示文件执行 read/write 操作时,不会阻塞
        mWakeEventFd = eventfd(0, EFD_NONBLOCK);
    
        AutoMutex _l(mLock);
        // 重建 epoll 事件
        rebuildEpollLocked();
    }
    
    void Looper::rebuildEpollLocked() {
        // 关闭原有的 epoll 实例
        if (mEpollFd >= 0) {
            close(mEpollFd);
        }
    
        // 创建新的 epoll 实例并注册管道
        // EPOLL_SIZE_HINT = 8 表示 mEpollFd 最多可以关注 8 个fd
        mEpollFd = epoll_create(EPOLL_SIZE_HINT);
        // 设置监听的事件
        struct epoll_event eventItem;
        // 将未使用的数据区域置0
        memset(& eventItem, 0, sizeof(epoll_event));
        // 对应的文件描述符上有可读数据事件
        eventItem.events = EPOLLIN;
        eventItem.data.fd = mWakeEventFd;
        // 将唤醒事件 mWakeEventFd 注册到 epoll 实例
        int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
    
        for (size_t i = 0; i < mRequests.size(); i++) {
            const Request& request = mRequests.valueAt(i);
            struct epoll_event eventItem;
            request.initEventItem(&eventItem);
            // 将 request 队列的事件添加到 epoll 实例
            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
        }
    }
    
    

    Native 层的 Looper 对象创建时做了如下操作

    • 创建一个事件通知的文件描述符 mWakeEventFd,并将其置为 EFD_NONBLOCK;
    • 创建 epoll 实例(IO多路复用机制,可同时监控多个fd),设置 epoll 的触发事件 eventItem 并将 mWakeEventFdmRequest 注册到 epoll ;

    小结

    消息队列的创建过程涉及到 Java 层和 Native 层,可以看到 Java 层的 LooperMessageQueue 对象的创建顺序和 Native 层刚好是相反的

    • Looper 对象为线程单例,同时持有一个消息队列 MessageQueue 的引用;
    • 消息队列 MessageQueue 在创建的同时会创建一个 Native 层的消息队列 NativeMessageQueue 对象将其指针地址保存在 mPtr 中;
    • Native 层的 Looper 对象会创建 mWakeEventFd 文件描述符和 epoll IO多路复用实例 mEpollFd 监听 mWakeEventFd 的 IO 读写事件;

    2.消息循环过程

    frameworks\base\core\java\android\os\Looper.java

    public static void loop() {
            // 获取当前线程的 Looper 对象
            final Looper me = myLooper();
            if (me == null) {   // 执行 loop 函数之前必须先调用 prepare() 函数创建 looper 对象
                throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
            }
            // 获取 looper 对象中的消息队列
            final MessageQueue queue = me.mQueue;
    
            for (;;) {
                // 获取消息队列中的消息,没有消息处理时阻塞线程
                Message msg = queue.next(); // might block
            
                 try {
                    // msg.target 即为 Handler,调用 Handler 的 dispatchMessage() 分发消息
                    msg.target.dispatchMessage(msg);
                   ...
                } finally {
                   ...
                }
            
            }
    }
    

    消息队列创建完成之后,调用 Looper.loop() 函数启动消息循环执行如下操作

    • 获取当前线程的消息队列 MessageQueue
    • 启动 for 循环,不断的调用 queue.next() 函数查询是否有需要处理的消息,如果没有需要处理的消息,当前线程会在 next() 中睡眠;
    • 调用 msg.target.dispatchMessage() 分发处理消息,msg.target 即为 Handler;

    frameworks\base\core\java\android\os\MessageQueue.java

    
    // 消息队列,要处理的消息列表
    Message mMessages;
    
    Message next() {
            // 注册到消息队列中的空闲消息处理器 (IdelHandler) 的数量
            int pendingIdleHandlerCount = -1;
            // 消息队列处理下一个消息需要进入睡眠状态等待的时间
            // 0: 即使当前消息队列没有新的消息处理,当前线程也不要进入睡眠等待状态
            // -1: 消息队列没有新的消息需要处理时,当前线程需要进入睡眠状态,直到它被其他线程唤醒
            int nextPollTimeoutMillis = 0;
            for (;;) {
                if (nextPollTimeoutMillis != 0) {
                    Binder.flushPendingCommands();
                }
                // 检查当前线程是否有新的消息需要处理
                nativePollOnce(ptr, nextPollTimeoutMillis);
    
                synchronized (this) {
                    // Try to retrieve the next message.  Return if found.
                    final long now = SystemClock.uptimeMillis();
                    Message prevMsg = null;
                    Message msg = mMessages;
                    // msg.target == null 表示 msg 为同步消息屏障
                    if (msg != null && msg.target == null) {
                        // msg 为同步屏障消息,通过do while 循环获取链表里的第一条异步消息
                        do {
                            prevMsg = msg;
                            msg = msg.next;
                        } while (msg != null && !msg.isAsynchronous());
                    }
                    if (msg != null) {  // 消息不为空
                        if (now < msg.when) {
                            // 还未到达消息处理的时间,计算线程需要睡眠的时间
                            nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                        } else {// 新消息需要处理,将消息从队列中取出并返回
                            // 标记当前线程已经从睡眠中唤醒
                            mBlocked = false;
                            if (prevMsg != null) {  // msg 为异步消息,更新消息队列
                                prevMsg.next = msg.next;
                            } else {    // msg 为同步消息,更新列表的头部
                                mMessages = msg.next;
                            }
                            msg.next = null;
                            msg.markInUse();
                            return msg; // 返回需要处理的 msg
                        }
                    } else {    // 消息队列没有消息,当前线程进入睡眠状态
                        nextPollTimeoutMillis = -1;
                    }
    
                    // 用于退出消息
                    if (mQuitting) {
                        dispose();
                        return null;
                    }
    
                    // 判断是否有空闲消息处理器
                    if (pendingIdleHandlerCount < 0
                            && (mMessages == null || now < mMessages.when)) {
                        pendingIdleHandlerCount = mIdleHandlers.size();
                    }
                    if (pendingIdleHandlerCount <= 0) { // 如果没有空闲消息需要处理,continue 进入睡眠
                        mBlocked = true;
                        continue;
                    }
    
                    if (mPendingIdleHandlers == null) {
                        mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                    }
                    mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
                }
                // 循环处理空闲消息
                for (int i = 0; i < pendingIdleHandlerCount; i++) {
                    final IdleHandler idler = mPendingIdleHandlers[i];
                    mPendingIdleHandlers[i] = null; // release the reference to the handler
    
                    boolean keep = false;
                    try {
                        keep = idler.queueIdle();
                    } catch (Throwable t) {
                        Log.wtf(TAG, "IdleHandler threw exception", t);
                    }
    
                    if (!keep) {
                        synchronized (this) {
                            mIdleHandlers.remove(idler);
                        }
                    }
                }
                // 空闲消息处理完成,重置为0
                pendingIdleHandlerCount = 0;
        
                nextPollTimeoutMillis = 0;
            }
        }
    

    queue.next() 函数内部同样维护了一个 for 循环来获取需要处理的消息,主要功能为

    • 调用 nativePollOnce() 函数检查当前线程的消息队列中是否有新的消息需要处理,如果需要进入睡眠等待状态,睡眠的时间由 nextPollTimeoutMillis 决定,其取值为

      • 0: 当前线程没有消息处理,也不需要进入睡眠状态;
      • -1: 当前线程进入无限期睡眠直到被其它线程唤醒;
      • 大于0: 当前线程需要睡眠的时间为 nextPollTimeoutMillis;
    • 当前线程从 nativePollOnce() 函数返回之后会获取 mMessages 链表中是否有需要处理的消息

      • 没有消息(mMessage == null): 设置 nextPollTimeoutMillis = -1 当前线程进入无限期睡眠;
      • 有消息但还未到触发时间(now < msg.when): 设置 nextPollTimeoutMillis = msg.when - now 当前线程进入睡眠,时间为差值;
      • 有消息且满足触发时间: 将消息移出 mMessages 链表并返回,等待 Looper.loop() 函数的下一次 queue.next();
    • 当前线程如果没有消息处理,根据 pendingIdleHandlerCount 判断是否由空闲的消息处理器来处理消息;

    frameworks\base\core\jni\android_os_MessageQueue.cpp

    static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
            jlong ptr, jint timeoutMillis) {
        // prt 即为 NativeMessageQueue 的指针对象
        NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
        nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
    }
    
    void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
        mPollEnv = env;
        // pollObj 对象即为 MessageQueue 对象
        mPollObj = pollObj;
        // 调用 Native 层的 Looper 的 pollOnce 函数
        mLooper->pollOnce(timeoutMillis);
        mPollObj = NULL;
        mPollEnv = NULL;
    
        if (mExceptionObj) {
            env->Throw(mExceptionObj);
            env->DeleteLocalRef(mExceptionObj);
            mExceptionObj = NULL;
        }
    }
    

    system\core\libutils\Looper.cpp

    
    int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
        int result = 0;
        for (;;) {
        ...
            if (result != 0) {  // result != 0 表示有新的消息需要处理,跳出循环
                if (outFd != NULL) *outFd = 0;
                if (outEvents != NULL) *outEvents = 0;
                if (outData != NULL) *outData = NULL;
                return result;
            }
            // result == 0 表示没有消息需要处理
            // 由于位于 for 循环内部,没有需要处理的消息时,pollInner() 会被不断地调用
            result = pollInner(timeoutMillis);
        }
    }
    
    int Looper::pollInner(int timeoutMillis) {
        ...
        // Poll.
        int result = POLL_WAKE;
    
        struct epoll_event eventItems[EPOLL_MAX_EVENTS];
        // 监听 mEpollFd 实例中的文件描述符的 IO 读写事件
        // 如果这些文件描述符(fd)都没有发生 IO 读写事件,则当前线程会在 epoll_wait 中进入睡眠等待,睡眠的时间为 timeoutMillis
        int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
        ...
        // 检查是哪一个文件描述符发生了 IO 读写事件
        for (int i = 0; i < eventCount; i++) {
            int fd = eventItems[i].data.fd;
            uint32_t epollEvents = eventItems[i].events;
            if (fd == mWakeEventFd) {   // mWakeEventFd 即为 Looper 对象创建的用于监听消息的文件描述符
                if (epollEvents & EPOLLIN) {
                // 如果是 mWakeEventFd 文件描述符 && mWakeEventFd 发生的 IO 读写事件类型为 EPOLLIN
                // 则唤醒当前线程处理消息
                    awoken();
                } else {
            ...
            }
    
            } else {
               ...
            }
        }
        ...
    
        return result;
    }
    
    // 唤醒当前线程,读取文件描述符中的数据
    void Looper::awoken() {
        uint64_t counter;
        // 循环读取文件描述符中的数据
        TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
    }
    

    nativePollOnce() 最终调用的是 Looper(native层) 对象的 pollOnce(),通过 for 循环不断的调用 pollInner() 函数来判断是否有新的消息需要处理

    • result != 0: 有新消息需要处理,跳出循环;
    • result == 0: 没有新消息处理,继续循环调用 pollInner();

    pollInner() 调用 epoll_wait 监听前面创建的的 epoll 实例中的文件描述符的 IO 读写事件

    • 没有 IO 读写事件: 当前进程在 epoll_wait 函数中睡眠,时间由 timeoutMillis 决定;
    • epoll_wait函数返回之后: 通过 for 循环判断是否是 mWakeEventFd 文件描述符发生了 EPOLLIN IO 写入事件,如果是,调用 awoken() 函数唤醒当前线程将文件描述符中的数据读取出来;

    3.消息的发送和处理

    3.1 消息发送

    Android 系统提供了一个 Handler 类用来向一个线程消息队列发送消息

    frameworks\base\core\java\android\os\Handler.java

    public class Handler {
        ...
    
        public final boolean sendMessage(Message msg)
        {
            return sendMessageDelayed(msg, 0);
        }
        
        public final boolean post(Runnable r)
        {
           return  sendMessageDelayed(getPostMessage(r), 0);
        }
    
        public final boolean sendMessageDelayed(Message msg, long delayMillis)
        {
            if (delayMillis < 0) {
                delayMillis = 0;
            }
            return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
        }
    
        public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
            MessageQueue queue = mQueue;
            if (queue == null) {
                return false;
            }
            return enqueueMessage(queue, msg, uptimeMillis);
        }
        
        private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
            // 将 Handler 设置成 msg 的 target
            msg.target = this;
            // mAsynchronous == true 表示发送的是异步消息
            if (mAsynchronous) {
                msg.setAsynchronous(true);
            }
            // 将 msg 加入消息队列
            return queue.enqueueMessage(msg, uptimeMillis);
        }
        
        ...
    }
    

    Handler 类提供了多个重载函数用于发送各种消息,这些函数最终都会调用 enqueueMessage() 其功能如下

    • Handler 设置成 msg 的 target;
    • mAsynchronus == true,则将 msg 置为异步消息
    • 将 msg 加入消息队列

    frameworks\base\core\java\android\os\MessageQueue.java

    boolean enqueueMessage(Message msg, long when) {
    
        synchronized (this) {
            if (mQuitting) {    // 如果消息队列正在退出,则回收消息并退出
                msg.recycle();
                return false;
            }
    
            msg.markInUse();    //  标记 msg 为正在被使用
            msg.when = when;    // 设置消息触发的时间
            // 获取链表的第一条消息
            // mMessages 根据 msg.when 从小到大排列
            Message p = mMessages;
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {
                // 如果链表为空 || 当前 msg 的优先级最大 || 当前 msg 触发时间 < 系统时间
                // 则设置 msg 为链表的第一条消息,优先执行
                msg.next = p;
                mMessages = msg;
                // mBlocked: 记录目标线程是否处于睡眠状态, true 表示当前线程正在睡眠
                // needWake: 记录本次消息插入后,是否需要执行唤醒当前线程的操作
                needWake = mBlocked;
            } else {
                // 当前线程正在睡眠 && p 为同步消息屏障 && msg 为异步消息
                // 需要唤醒当前线程执行异步消息
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    // 根据 msg.when 找到 msg 插入的位置为 prev 后面
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
            // 插入 msg
                msg.next = p;
                prev.next = msg;
            }
            // 当前线程正在睡眠,需要执行唤醒操作
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }
    

    可以看到 MessageQueue 内部的的消息链表mMessages 是根据 msg.when 消息发送的时间来排序的,enqueueMessage() 在将 msg 插入消息队列之后会判断是否需要执行 nativeWake() 唤醒当前线程;

    frameworks\base\core\jni\android_os_MessageQueue.cpp

    void NativeMessageQueue::wake() {
        mLooper->wake();
    }
    

    system\core\libutils\Looper.cpp

    void Looper::wake() {
        uint64_t inc = 1;
        // 向 mWakeEventFd 文件描述符中写入一个数据
        ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
        ...
    }
    

    nativeWake() 函数用于唤醒当前线程,通过 wake() 会向 mWakeEventFd 文件描述符写入数据,此时会触发 epoll 的监听事件,从第二节 pollInner() 中的 epoll_wait 函数中唤醒当前线程,最终回到 MessageQueuenext() 中返回 msg;

    3.2 消息处理

    frameworks\base\core\java\android\os\Looper.java

    public static void loop() {
        ...
        for (;;) {
            // 从消息队列中拿到了需要处理的消息
            Message msg = queue.next();
            if (msg == null) {
                return; // msg == null 表示退出消息循环
            }
            ...
            try {
                msg.target.dispatchMessage(msg);
               
            } finally {
               
            }
            ...
            // 回收 msg 防止重复创建
            msg.recycleUnchecked();
        }
    }
    

    frameworks\base\core\java\android\os\Handler.java

    public void dispatchMessage(Message msg) {
        // 1. 发送的 msg 设置了 callback ,触发 msg 的 callback (handler.post())
        if (msg.callback != null) {
            handleCallback(msg);
        } else {
            // 2. 创建 Handler 时设置了 callback,触发 handler 的 callback
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            // 3. 正常情况下会走该分支,覆写 Handler.handleMessage() 处理消息
            handleMessage(msg);
        }
    }
    

    queue.next() 回来之后会触发 handler(msg.target) 的 dispatchMessage() 分发 msg

    • msg.callback != null: 触发 msgcallback,通常是 handler.post() 发送的消息;
    • mCallback != null: 触发 handlercallbackmCallbackhander的重载构造函数中初始化;
    • handleMessage(): 一般我们重写这个函数来处理消息;

    相关文章

      网友评论

          本文标题:Handler消息机制

          本文链接:https://www.haomeiwen.com/subject/tzvhektx.html