美文网首页
Handler、Looper、Message、MessageQu

Handler、Looper、Message、MessageQu

作者: ArcherZang | 来源:发表于2020-03-19 20:26 被阅读0次

    版权声明:本文为CSDN博主「风再起时与不羁的风」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/chewbee/article/details/78108201

    Handler消息机制native层

    1.简介

    在介绍Handler消息机制(Java层)时,我们看到了Java层的MessageQueue中调用了几个native方法。除此之外,native层也有一套完善的消息机制,用于处理native的消息。

    首先来看下native层handler消息的类图:

    从图中可以看到,native层的消息机制关键元素和Java层的消息机制的关键元素基本一致,分别包括Looper、MessageQueue、Handler、Message这几个关键元素。除了MessageQueue是Java层与Native层相关联的,其余的几个元素都是不相关联的,但元素的功能基本是一致的,只是Java层和Natvie层的实现方式不一样的。

    接下来将从MessageQueue中的native方法开始分析Native层的Handler消息机制。

    2.MessageQueue的native方法

    在Java层的MessageQueue中定义了几个native方法:

    public final class MessageQueue {
        ......
        private native static long nativeInit();
        private native static void nativeDestroy(long ptr);
        private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
        private native static void nativeWake(long ptr);
        private native static boolean nativeIsPolling(long ptr);
        private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
        ......
    }
    

    nativeInit方法

    在构建Java层的MessageQueue时,会调用nativeInit()方法进行初始化。

    2.1 MessageQueue()

    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit();// 保存native层的MessageQueue引用,见2.1
    }
    

    在构建MessageQueue的过程中,会通过JNI调用native层的android_os_MessageQueue_nativeInit方法进行初始化。

    2.2 android_os_MessageQueue_nativeInit()

    static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
        NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();// 构建MessageQueue,见2.3
        if (!nativeMessageQueue) {
            jniThrowRuntimeException(env, "Unable to allocate native queue");
            return 0;
        }
    
        nativeMessageQueue->incStrong(env);
        return reinterpret_cast<jlong>(nativeMessageQueue);//返回到Java层
    }
    

    2.3 NativeMessageQueue()

    NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
        mLooper = Looper::getForThread();//获取该线程关联的Looper
        if (mLooper == NULL) {
            mLooper = new Looper(false);// 创建一个Looper对象,见2.4
            Looper::setForThread(mLooper);// 将该Looper保存到线程的TLS中
        }
    }
    

    在Native层的MessageQueue中,包含了一个Looper对象,该Looper对象是与线程相关的,保存在线程的TLS中,这个与Java层的处理方式一样。如果该线程的Looper对象存在,则直接返回;如果不存在,则创建一个Looper,并保存到该线程的TLS中。

    2.4 Looper()

    Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    mWakeEventFd = eventfd(0, EFD_NONBLOCK);// 构建一个唤醒事件的文件描述符fd
    
    AutoMutex _l(mLock);
    rebuildEpollLocked();// 重构epoll事件,见2.5
    }
    

    eventfd函数会创建一个事件对象(eventfd object),用来实现进程(线程)间的等待(wait)/通知(notify)机制(Linux管道pipe也可以实现该功能),内核会为这个对象维护一个64位的计数器。eventfd的函数定义如下:

    #include<sys/eventfd.h>  
    int eventfd(unsigned int initval,int flags);
    

    第一个参数initval用来初始化这个计数器,一般初始化为0;
    第二个参数为标志位,可选的值为:

    • EFD_NONBLOCK:功能同O_NONBLOCK,设置对象为非阻塞状态
    • EFD_CLOEXEC:这个标识被设置的话,调用exec后会自动关闭文件描述符,防止泄漏。

    在Linux2.6.26版本之前,flags必须设置为0.

    eventfd函数返回一个新的文件描述符,这个文件描述符可以支持以下操作:

    read:如果计数值counter的值不为0,读取成功,获得到该值。如果counter的值为0,非阻塞模式下,会直接返回失败,并且把errno的值设为EINVAL。如果为阻塞模式,则会一直阻塞到counter为非0为止。

    write:将缓存区写入的8字节整形值加到内核计数器上。

    在构造native层的Looper时,会通过eventfd函数创建一个唤醒文件描述符,利用IO多路复用机制epoll可以监听该描述符,实现线程间等待/通知。

    2.5.Looper.rebuildEpollLocked()

    void Looper::rebuildEpollLocked() {
    // 如果有新的epoll事件,则把原来的epoll事件关闭
    if (mEpollFd >= 0) {
        close(mEpollFd);
    }
    
    // Allocate the new epoll instance and register the wake pipe.
    // 创建一个epoll实例,并注册wake管道
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;// 监听可读事件
    eventItem.data.fd = mWakeEventFd;// 监控的fd为唤醒事件fd
    // 注册唤醒事件mWakeEventFd到epoll实例中
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
    // 如果请求队列中有数据,则还需要将请求中的事件注册到epoll实例中
    for (size_t i = 0; i < mRequests.size(); i++) {
        const Request& request = mRequests.valueAt(i);
        struct epoll_event eventItem;
        request.initEventItem(&eventItem);// 初始化请求事件,见2.6
        // 注册请求中的事件到epoll实例中
        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
        if (epollResult < 0) {
            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d",
                    request.fd, errno);
        }
    }
    }
    

    在rebuildEpollLocked方法中,通过Linux系统的epoll机制,来实现线程间的等待与唤醒操作。Linux系统的epoll机制是为处理大批量句柄而产生的,是Linux下多路复用IO接口select/poll的增强版。epoll除了监控通过eventfd创建的唤醒文件描述符外,还可以监控其他的文件描述符,因为Loope提供了addFd接口来添加文件描述符到Looper对象中,当这些被监控的文件符上有事情发生时,就会唤醒相应的线程来处理。在这里我们只关心通过eventfd创建的唤醒文件描述符。

    epoll机制是包含了epoll_create、epoll_ctl和epoll_wait三个方法,通过epoll_create来创建一个epoll专用的文件描述符。通过epoll_ctl函数来告诉需要监控文件描述符的什么事件,这里表示需要监控mWakeEventFd文件描述符的EPOLLIN事件,即当管道中有内容可读时,就唤醒当前正在等待管道中内容的线程。

    2.6 Looper.Request.initEventItem()

    void Looper::Request::initEventItem(struct epoll_event* eventItem) const {
        int epollEvents = 0;
        if (events & EVENT_INPUT) epollEvents |= EPOLLIN;// 如果请求可读,则监听可读事件
        if (events & EVENT_OUTPUT) epollEvents |= EPOLLOUT;// 如果请求可写,则监听可写事件
    
        memset(eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
        eventItem->events = epollEvents;// 监听的事件类型
        eventItem->data.fd = fd;// 监听的文件描述符fd
    }
    

    总结:

    在nativeInit方法中,主要是创建一个native层的MessageQueue,同时创建一个Looper对象,并在Looper中注册监听wake事件以及Request队列中的事件。当Java层的消息队列中没有消息时,就使Android应用程序主线程进入等待状态,而当Java层的消息队列中来了新的消息后,就唤醒Android应用程序的主线程来处理这个消息。

    nativePollOnce方法

    在Java层MessageQueue的next方法中,会阻塞调用native层的nativePollOnce方法来获取消息队列中的消息。

    3.1 MessageQueue.next()

    Message next() {
        ......
        for (;;) {
            nativePollOnce(ptr, nextPollTimeoutMillis);见3.2
        }
        ......
    }
    

    3.2 nativePollOnce()

    static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
        NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
        nativeMessageQueue->pollOnce(env, obj, timeoutMillis);// 调用pollOnce方法,见3.3
    }
    

    3.3 NativeMessageQueue.pollOnce()

    void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
        mPollEnv = env;
        mPollObj = pollObj;
        mLooper->pollOnce(timeoutMillis);// 调用Looper中的pollOnce方法,见3.4
        mPollObj = NULL;
        mPollEnv = NULL;
    
        if (mExceptionObj) {
            env->Throw(mExceptionObj);
            env->DeleteLocalRef(mExceptionObj);
            mExceptionObj = NULL;
        }
    }
    

    3.4 Looper.pollOnce()

    inline int pollOnce(int timeoutMillis) {
        return pollOnce(timeoutMillis, NULL, NULL, NULL); 
    }
    
    int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
        int result = 0;
        for (;;) {
            // 循环处理响应列表中的事件
            while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {// 处理没有callback的事件
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }
    
        if (result != 0) {
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }
        
        result = pollInner(timeoutMillis);// 内部轮询处理,见3.5
    }
    }
    

    pollOnce()方法等待事件准备好,有一个可选的超时时间,timeoutMilllis有以下取值:

    • 0,立即返回,没有阻塞;
    • 负数,一直阻塞,直到事件发生;
    • 正数,表示最多等待多久时间;

    pollOnce方法的返回值可以有以下几个取值:

    • POLL_WAKE,在超时之前,通过wake()方法唤醒的,没有callbacks被触发,没有文件描述符准备好了,即pipe写端的write事件触发;
    • POLL_CALLBACK,在一个或多个文件描述符被触发了;
    • POLL_TIMEOUT,在超时之前,没有数据准备好了,表示等待超时;
    • POLL_ERROR,发生了错误;
    • 大于0的正数,返回数据已经准备好了的fd;

    3.5 Looper.pollInner()

    int Looper::pollInner(int timeoutMillis) {
    
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);//记录当前时间
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);// 更新消息超时时间
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
    }
    
    // 事件处理类型为POLL_WAKE
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;
    
    // We are about to idle.
    mPolling = true;
    
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);//阻塞等待事件发生
    
    // No longer idling.
    mPolling = false;
    
    // Acquire lock.
    mLock.lock();
    
    // Rebuild epoll set if needed.
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done;
    }
    
    //Poll错误
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error, errno=%d", errno);
        result = POLL_ERROR;
        goto Done;
    }
    
    //Poll超时
    if (eventCount == 0) {
        result = POLL_TIMEOUT;
        goto Done;
    }
    
    // 1.处理所有已经准备好的事件,包含唤醒事件以及Request队列中的事件
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;//文件描述符
        uint32_t epollEvents = eventItems[i].events;//事件掩码
        if (fd == mWakeEventFd) {//如果是唤醒事件
            if (epollEvents & EPOLLIN) {
                awoken();//唤醒,见3.6
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {//请求队列中的事件
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));//将请求事件放入Response数组中
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
    Done: ;
    
    // 2.处理所有MessageEnvelopes的消息回调
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        // 如果消息的处理时间小于当前时间,则将该消息从列表中移除
        if (messageEnvelope.uptime <= now) {
            // Remove the envelope from the list.
            // We keep a strong reference to the handler until the call to handleMessage
            // finishes.  Then we drop it so that the handler can be deleted *before*
            // we reacquire our lock.
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;//处理消息的handler
                Message message = messageEnvelope.message;//获取消息
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();
    
                handler->handleMessage(message);//处理消息
            } // release handler
        
            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;//事件处理类型为POLL_CALLBACK
        } else {
            mNextMessageUptime = messageEnvelope.uptime;//更新下一个消息的处理时间
            break;
        }
    }
    
    // Release lock.
    mLock.unlock();
    
    // 3.处理所有Responses回调
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        // 如果响应类型为POLL_CALLBACK
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
            int callbackResult = response.request.callback->handleEvent(fd, events, data);//调用callback类型的handleEvent方法
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }
            response.request.callback.clear();
            result = POLL_CALLBACK;//事件处理类型为POLL_CALLBACK
        }
    }
    return result;
    }
    

    pollInner()方法首先处理epoll_wait()方法返回的准备好的事件,接着处理MessageEnvelopes队列中准备好的事件,最后处理响应Responses队列中的准备好事件。

    如果mWakeEventFd文件描述符上发生了EPOLLIN就说明应用程序中的消息队列里面有新的消息需要处理了,接下来它就会先调用awoken函数清空管道中的内容,以便下次再调用pollinner函数时,知道自上次处理完消息队列中的消息后,有没有新的消息加进来。

    6.Looper.awoken()

    void Looper::awoken() {
        uint64_t counter;
        TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));//不断读取管道数据,目的就是为了清空管道内容
    }
    

    函数awoken的实现很简单,它只把管道中内容读取出来,清空管道,以便下次调用epoll_wait时可以再次阻塞等待。

    总结:在nativePollOnce()过程中,首先处理Response数组中非POLL_CALLBACK类型的事件,接着处理epoll_wait()方法返回准备好的事件,然后是处理MessageEnvelopes队列中事件,最后是处理Response队列中的POLL_CALLBACK类型事件。事件的处理顺序是:

    1.Response数组中非POLL_CALLBACK类型的事件 > 2.epoll_wait()返回的wake事件 > 3.处理MessageEnvelopes队列中的事件 > 4.处理Response数组中POLL_CALLBACK类型的事件

    当epoll_wait()返回时,可能处理以下几种情况:

    1. POLL_ERROR,发生了错误;
    2. POLL_TIMEOUT,发生了超时错误;
    3. 返回了准备好的事件,有两种类型的事件:一种是唤醒事件,管道已经有数据可读了;另外一种是其他类型的事件,放入到Response数组中处理。

    nativeWake方法

    nativeWake用于唤醒功能,在添加消息到消息队列enqueueMessage(),或者把消息从消息队列中全部移除quit()时,可能需要调用nativeWake方法。

    4.1 MessageQueue.enqueueMessage()

    boolean enqueueMessage(Message msg, long when) {
        ......
        if (needWake) {
            nativeWake(mPtr);
        }
        ......
    }
    

    4.2 android_os_MessageQueue_nativeWake()

    static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
        NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
        nativeMessageQueue->wake();//调用NatvieMessageQueue的wake方法,见4.3
    }
    

    4.3 NativeMessageQueue.wake()

    void NativeMessageQueue::wake() {
        mLooper->wake();//调用Looper的wake方法,见4.4
    }
    

    4.4 Looper.wake()

    void Looper::wake() {
        uint64_t inc = 1;
        ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));//向管道mWakeEventFd写入字符1
        if (nWrite != sizeof(uint64_t)) {
            if (errno != EAGAIN) {
            ALOGW("Could not write wake signal, errno=%d", errno);
            }
        }
    }
    

    总结:nativeWake()方法主要是向管道mWakeEventFd中写入字符1,唤醒在mWakeEventFd上等待的事件。往文件描述中写入内容的目的是为了唤醒应用程序的主线程,当应用程序的消息队列中没有消息处理时,应用程序的主线程就会进入空闲等待状态,而这个空闲等待状态就是通过调用Looper类的pollInner函数进入的,具体就是在pollInner函数中调用epoll_wait函数来等待管道中有内容可读的。

    当文件描述符有内容可读时,应用程序的主线程就会从Looper的pollInner函数返回到JNI层的nativePollOnce函数,最后返回到Java层的MessageQueue.next函数中去,在这里就会发现消息队列中有新的消息需要处理了,于是就处理这个消息。

    3.总结

    Java层的MessageQueue通过mPtr变量保存了一个NativeMessageQueue对象,从而使得MessageQueue成为Java层和Native层的衔接桥梁,既能处理上层消息,也能处理native层消息。

    Java层和Native层的MessageQueue是通过JNI建立关联的,彼此之间能相互调用。

    消息的处理优先级是从先处理Native层的Message,再处理Native层的Request,最后处理Java层的Message。

    Android消息机制2-Handler(Native层)
    Android应用程序消息处理机制(Looper、Handler)

    相关文章

      网友评论

          本文标题:Handler、Looper、Message、MessageQu

          本文链接:https://www.haomeiwen.com/subject/fohryhtx.html