美文网首页
由浅入深全面分析Handler机制原理之源码<难点>

由浅入深全面分析Handler机制原理之源码<难点>

作者: 安仔夏天勤奋 | 来源:发表于2020-09-21 15:22 被阅读0次

    前言

    下面的内容基于由浅入深全面分析Handler机制原理之源码的理解,扩展的Handler机制的难点分析。

    目录

    • 内存共享(如何切换线程的)
    • prepare()函数中,使用ThreadLocal存放Looper对象,ThreadLocal的作用。
    • Message使用对象复用池(享元设计模式)
    • Handler的阻塞/唤醒机制
    • Handler的同步屏障

    来,上一张大纲图,醒一下神

    image

    Handler如何切换线程

    当我们都了解了Handler的原理,再结合上述的使用例子,我们也对Handler是如何切换的,其实情况也很明朗了。

    1. 创建Message对象,我们都是知道创建一个对象,对象一般都是存放在堆内存,在JVM中堆是在线程共享区域,并不是存放在线程私有数据区,所以说所有线程都可以访问得到这个Message对象。
    2. 在子线程中,Handler发送一条Message,Message会插入到MessageQueue中。有人会问MessageQueue是属于那个线程,其实MessageQueue并不是属于那个线程,只仅仅是存放Message的容器而已,就相当于List一样,只不过MessageQueue比较特殊,是一个队列的形式而已。
    3. 当Looper.loop()函数,不断的从MessageQueue取出Message,当获取到Message后,通过Handler.dispatchMessage(msg)函数去分发消费掉这条Message而已。

    Handler的线程如何切换是不是很简单,当然这都是你对Handler的原理熟悉掌握,并不难理解。下面通过一张图加深到Handler线程切换。

    image

    用ThreadLocal保存Looper对象,目的是什么?

    ThreadLocal是线程本地变量,它是一种特殊的变量,ThreadLocal为每个线程提供一个变量的副本,使得每一个线程在同一时间内访问到的不同对象,这样就隔离了线程之间对数据的一个共享访问,那么在内部实现上,ThreadLocal内部都有一个ThreadLocalMap,用来保存每个线程所拥有的变量的副本。

    子线程中创建Handler体现ThreadLocal的使用

    错误例子:直接在子线程创建一个Handler。

    new Thread(new Runnable() {
        @Override
        public void run() {
            new Handler(){
                @Override
                public void handleMessage(Message msg) {
                    super.handleMessage(msg);
                }
            };
        }
    }).start();
    
    

    直接抛出异常:java.lang.RuntimeException: Can't create handler inside thread that has not called Looper.prepare()

    直接看源码:

    public Handler(Callback callback, boolean async) {
       //...省略部分代码
        mLooper = Looper.myLooper();
        if (mLooper == null) {
            throw new RuntimeException(
                "Can't create handler inside thread that has not called Looper.prepare()");
        }
        //...省略部分代码
    }
    //Looper类
    private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper(quitAllowed));
    }
    public static @Nullable Looper myLooper() {
        return sThreadLocal.get();
    }
    
    

    使用ThreadLocal 保存当前Looper对象,ThreadLocal 类可以对数据进行线程隔离,保证了在当前线程只能获取当前线程的Looper对象,同时prepare()函数保证当前线程有且只有一个Looper对象。所以在子线程中创建Handler对象,需要添加Looper.prepare(),如果只单单添加Looper.prepare()函数还是不行,缺少动力,也需要添加Looper.loop()函数。

    正确代码如下:

    new Thread(new Runnable() {
        @Override
        public void run() {
            Looper.prepare();//注意添加
            new Handler(){
                @Override
                public void handleMessage(Message msg) {
                    super.handleMessage(msg);
                }
            };
            Looper.loop();//注意添加
        }
    }).start();
    
    

    为什么可以直接new Message,还提供obtain()函数,为什么这样设计?

    如果不断的new出Meaage对象并插入到MessageQueue中,在jvm的堆中是不是会不断有新Message对象创建以及销毁,导致内存抖动,而GC线程虽然作为优先级最低的线程,此时因为必须GC,导致GC线程抢占CPU时间片,主线程拿不到CPU而卡顿调帧。Google在设计消息机制的时候就想到了消息复用机制,几乎所有Framework中发送消息都是通过Message.obtain()函数来进行消息复用。这种消息复用机制其实就是一种享元设计模式。享元设计模式就不展开了,感兴趣的可以自行查阅资料

    下面我们看看Message是如何复用的,首先看看Mesage里的几个成员变量:

    Message next;//形成一个链表,指向下一个Message
    private static final Object sPoolSync = new Object();//对象锁
    private static Message sPool;//头节点的消息
    private static int sPoolSize = 0;//当前链表的个数
    private static final int MAX_POOL_SIZE = 50;//链表最多存放的个数
    
    

    Looper的loop()函数中不断的从消息队列中取消息diapatch,分发之后会调用Message的回收操作。

    public static void loop() {
        //获取Looper对应的消息队列MessageQueue
        final MessageQueue queue = me.mQueue;
        //...省略部分代码
        for (;;) {//不断循环从消息队列中取出消息
            Message msg = queue.next(); //有可能阻塞
            if (msg == null) {//没有消息,则退出消息队列
                return;
            }
            //...省略部分代码
            //msg.target就是Handler,把获取到的消息分发出去
            msg.target.dispatchMessage(msg);  
            //...省略部分代码
            msg.recycleUnchecked();//回收Message
        }
    }
    
    

    当Message分发完之后,就调用recycleUnchecked()函数对Message进行回收。

    void recycleUnchecked() {
        flags = FLAG_IN_USE; // 标记改Message将加入消息池
        // 重置所有消息属性
        what = 0;
        arg1 = 0;
        arg2 = 0;
        obj = null;
        replyTo = null;
        sendingUid = -1;
        when = 0;
        target = null;
        callback = null;
        data = null;
    
        synchronized (sPoolSync) { // 线程安全锁
            if (sPoolSize < MAX_POOL_SIZE) { // MAX_POOL_SIZE = 50 ,表明消息池最多50个
                //头节点设置给Next 将当前对象最为最新的头节点sPool 
                next = sPool;  
                sPool = this;
                sPoolSize++;
            }
        }
    }
    
    

    所以获取一个对象池中的Message可以直接调用Message.obtain()函数即可。

    public static Message obtain() {
        synchronized (sPoolSync) {
            //sPool就是Looper.loop(),调用dispatchMessage函数后,调用recycleUnchecked()函数回收的Message
            if (sPool != null) {
               Message m = sPool;  //取出头节点
               sPool = m.next; // 将头节点的下一个作为最新的头节点
               m.next = null; // 设置需要返回的消息的next为空
               m.flags = 0; // 清除是否还在链表中
               sPoolSize--;  
               return m;
            }
        }
        //如果对象池中没有消息,就创建一个消息
        return new Message();
    }
    
    

    Handler的阻塞/唤醒机制

    首先我们要了解Handler的阻塞或唤醒是在那里发起的,经过上面的源码分析及Handler的整个调用过程,我们都知道先是创建一个Looper对象,然后创建一个Handler对象,最后调用Looper.loop()函数,在loop()函数中不断从MessageQueue.next()函数中的轮询获取Message。

    public static void loop() { 
        //...省略部分代码
        for (;;) {//不断循环从消息队列中取出消息
            Message msg = queue.next(); //有可能阻塞
            //...省略部分代码
            try {
                //msg.target就是Handler,把获取到的消息分发出去
                msg.target.dispatchMessage(msg);
            } finally {
            }
            //...省略部分代码
        }
    }
    
    Message next() {
        //...省略部分代码
        int nextPollTimeoutMillis = 0;
        for (;;) {
            nativePollOnce(ptr, nextPollTimeoutMillis);//根据nextPollTimeoutMillis是阻塞还唤醒
            synchronized (this) {
                Message msg = mMessages;
                if (msg != null && msg.target == null) {
                   //...省略部分代码
                }
                if (msg != null) {
                    if (now < msg.when) {
                        // 当头消息延迟时间大于当前时间,阻塞消息要到延迟时间和当前时间的差值
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {}
                   //...省略部分代码
                } else {
                    nextPollTimeoutMillis = -1;//队列已无消息,一直阻塞
                }
            }
            //...省略部分代码
        }
    }
    
    

    Looper轮询器调用loop()函数开始轮询,真正干活的是MessageQueue.next()函数,而next()函数中获取消息前首先调用了nativePollOnce(ptr, nextPollTimeoutMillis)函数,其意思是根据nextPollTimeoutMillis判断是否进行阻塞,nextPollTimeoutMillis初始化时默认为0表表示不阻塞。

    nextPollTimeoutMillis有三种对应的状态:

    • nextPollTimeoutMillis=0 ,不阻塞
    • nextPollTimeoutMillis<0 ,一直阻塞
    • nextPollTimeoutMillis>0 ,阻塞对应时长,可被新消息唤醒

    MessageQueue轮询获取Message时有两种阻塞情况:

    • 当轮询MessageQueue时获取获取不到Message,nextPollTimeoutMillis赋值为-1进行阻塞。
    • 当轮询MessageQueue时获取获取到Message,msg.when大于当前时间,时间差值就是阻塞的时长。

    Handler的阻塞

    在MessageQueue类中,有几个Native层的函数

        private native static long nativeInit();
        private native static void nativeDestroy(long ptr);
        private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
        private native static void nativeWake(long ptr);
        private native static boolean nativeIsPolling(long ptr);
        private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
    
    

    在MessageQueue的构造函数中,就调用nativeInit()函数进行初始化。

    MessageQueue(boolean quitAllowed) {
          mQuitAllowed = quitAllowed;
          mPtr = nativeInit();
     }
    
    

    也就是说在创建MessageQueue对象时,通过JNI调用native层的android_os_MessageQueue_nativeInit()函数进行初始化。进入源码看看里面做了那些事情。

    //android_os_MessageQueue.cpp
    static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
        // 创建一个与java对应的MessageQueue
        NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
        if (!nativeMessageQueue) {
            jniThrowRuntimeException(env, "Unable to allocate native queue");
            return 0;
        }
    
        nativeMessageQueue->incStrong(env);
        return reinterpret_cast<jlong>(nativeMessageQueue);//返回到Java层
    }
    
    NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
        mLooper = Looper::getForThread();//获取该线程关联的Looper
        if (mLooper == NULL) {
            mLooper = new Looper(false);// 创建一个Looper对象
            Looper::setForThread(mLooper);// 将Looper保存到线程里,也相当于Java中用ThreadLocal保存Looper一样
        }
    }
    //Looper.cpp
    Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
        mWakeEventFd = eventfd(0, EFD_NONBLOCK);// 创建一个唤醒事件fd
        AutoMutex _l(mLock);
        rebuildEpollLocked();// 重构epoll事件
    }
    
    #include<sys/eventfd.h>  
    int eventfd(unsigned int initval,int flags);
    
    void Looper::rebuildEpollLocked() {
        if (mEpollFd >= 0) {
            close(mEpollFd);//关闭epoll
        }
        // 创建一个epoll实例
        mEpollFd = epoll_create(EPOLL_SIZE_HINT);
        struct epoll_event eventItem;
        //重新设置
        memset(& eventItem, 0, sizeof(epoll_event)); 
        eventItem.events = EPOLLIN;// 监听可读事件
        eventItem.data.fd = mWakeEventFd;//唤醒事件fd
        // 注册唤醒事件mWakeEventFd到epoll实例中
        int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
        // 将请求中的事件注册到epoll实例中
        for (size_t i = 0; i < mRequests.size(); i++) {
            const Request& request = mRequests.valueAt(i);
            struct epoll_event eventItem;
            // 初始化请求事件
            request.initEventItem(&eventItem);
            // 注册请求中的事件到epoll实例中
            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
            if (epollResult < 0) {
                ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d",
                        request.fd, errno);
            }
        }
    }
    
    

    在调用android_os_MessageQueue_nativeInit()函数做了些什么事情:

    • 创建一个NativeMessageQueue对象与Jave层相对应的MessageQueue对象。
    • 获取一个MessageQueue相对应Looper对象,没有获取则创建一个Looper 对象。
    • 如果创建Looper对象,则保存Looper对应的线程。

    了解了nativeInit()层初始化时后,其实也不难明白,说穿了就是构建一个Java层一样的处理方式而已。

    值得注意的是 eventfd(0, EFD_NONBLOCK)这句代码,它是什么意思呢。eventfd函数会创建一个事件描述符对象,通过IO多路复用机制epoll可以监听事件描述符,实现进程(线程)间的等待(wait)/通知(notify),对就这么简单。

    eventfd()函数的标志参数有两种:

    • EFD_NONBLOCK:设置对象为非阻塞状态。
    • EFD_CLOEXEC:调用exec后会自动关闭文件描述符,防止泄漏。

    值得注意的是 rebuildEpollLocked()函数,在函数里,创建了epoll实例,向epoll中注册唤醒监听事件和请求监听事件。其实通过Linux系统的epoll机制,来实现线程间的等待与唤醒操作。好可以简单理解为epoll就是监听内容的读写变化,来实现阻塞或唤醒操作。

    nativePollOnce()函数

    MassgeQueue#nativePollOnce()函数是Native层调用的,那么我们进行Native层,通过源码看看他的调用流程。

    static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
        //获取NativeMessageQueue
        NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
        nativeMessageQueue->pollOnce(env, obj, timeoutMillis);// 调用pollOnce()函数
    }
    
    void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
        mPollEnv = env;
        mPollObj = pollObj;
        mLooper->pollOnce(timeoutMillis);// 调用Looper#pollOnce()函数
        mPollObj = NULL;
        mPollEnv = NULL;
    
        if (mExceptionObj) {//异步处理
            env->Throw(mExceptionObj);
            env->DeleteLocalRef(mExceptionObj);
            mExceptionObj = NULL;
        }
    }
    
    //Looper.cpp
    inline int pollOnce(int timeoutMillis) {
        return pollOnce(timeoutMillis, NULL, NULL, NULL); 
    }
    
    int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
        int result = 0;
        for (;;) {
            // 一个循环不断处理响应列表中的事件。
            while (mResponseIndex < mResponses.size()) {
                const Response& response = mResponses.itemAt(mResponseIndex++);
                int ident = response.request.ident;
                if (ident >= 0) {
                    int fd = response.request.fd;
                    int events = response.events;
                    void* data = response.request.data;
                    if (outFd != NULL) *outFd = fd;
                    if (outEvents != NULL) *outEvents = events;
                    if (outData != NULL) *outData = data;
                    return ident;
               }
            }
    
            if (result != 0) {
                if (outFd != NULL) *outFd = 0;
                if (outEvents != NULL) *outEvents = 0;
                if (outData != NULL) *outData = NULL;
                return result;
            }
            result = pollInner(timeoutMillis);// 内部轮询
         }
    }
    
    //循环处理内部事件
    int Looper::pollInner(int timeoutMillis) {
    
        if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
            //记录当前时间
            nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
    
            int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
            if (messageTimeoutMillis >= 0
                    && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
                timeoutMillis = messageTimeoutMillis;
            }
        }
    
        // 事件处理类型为POLL_WAKE  唤醒
        int result = POLL_WAKE;
        mResponses.clear();
        mResponseIndex = 0;
        mPolling = true;//正在轮询
        struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    
        //等待事件
        int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
        //不要轮询啦
        mPolling = false;
        //获取锁
        mLock.lock();
    
        //如果需要,重新生成epoll集。
        if (mEpollRebuildRequired) {
            mEpollRebuildRequired = false;
            rebuildEpollLocked();
            goto Done;
        }
    
        //POLL_ERROR Poll错误
        if (eventCount < 0) {
            if (errno == EINTR) {
                goto Done;
            }
            ALOGW("Poll failed with an unexpected error, errno=%d", errno);
            result = POLL_ERROR;
            goto Done;
        }
    
        //POLL_TIMEOUT Poll超时
        if (eventCount == 0) {
            result = POLL_TIMEOUT;
            goto Done;
        }
    
        //循环处理所有事件
        for (int i = 0; i < eventCount; i++) {
            int fd = eventItems[i].data.fd;//获取文件描述符
            uint32_t epollEvents = eventItems[i].events;//获取事件
            if (fd == mWakeEventFd) {//如果是唤醒事件
                if (epollEvents & EPOLLIN) {
                    awoken();//唤醒
                } else {
                    ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
                }
            } else {//处理请求队列中的事件
                ssize_t requestIndex = mRequests.indexOfKey(fd);
                if (requestIndex >= 0) {
                    int events = 0;
                    if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                    if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                    if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                    if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                    //请求事件添加到Response数组中
                    pushResponse(events, mRequests.valueAt(requestIndex));
                } else {
                    ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                            "no longer registered.", epollEvents, fd);
                }
            }
        }
        Done: ;
    
        //处理MessageEnvelopes的事件
        mNextMessageUptime = LLONG_MAX;
        while (mMessageEnvelopes.size() != 0) {
            nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
            const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
            // 如果消息的处理时间小于当前时间,则将从列表中移除
            if (messageEnvelope.uptime <= now) {
                { // obtain handler
                    sp<MessageHandler> handler = messageEnvelope.handler;//处理消息的handler
                    Message message = messageEnvelope.message;//获取消息
                    mMessageEnvelopes.removeAt(0);
                    mSendingMessage = true;
                    mLock.unlock();//释放锁
    
                    handler->handleMessage(message);//处理消息
                } // release handler
    
                mLock.lock();//加上锁
                mSendingMessage = false;
                result = POLL_CALLBACK;//事件处理类型为POLL_CALLBACK
            } else {
                mNextMessageUptime = messageEnvelope.uptime;//更新下一个消息的处理时间
                break;
            }
        }
    
        //释放锁
        mLock.unlock();
    
        //处理所有Responses事件
        for (size_t i = 0; i < mResponses.size(); i++) {
            Response& response = mResponses.editItemAt(i);
            // 如果响应类型为POLL_CALLBACK
            if (response.request.ident == POLL_CALLBACK) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
                //调用callback类型的handleEvent方法
                int callbackResult = response.request.callback->handleEvent(fd, events, data);
                if (callbackResult == 0) {
                    removeFd(fd, response.request.seq);
                }
                response.request.callback.clear();
                result = POLL_CALLBACK;//事件处理类型为POLL_CALLBACK
            }
        }
        return result;
    }
    
    void Looper::awoken() {
        uint64_t counter;
        TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
    }
    
    

    从java层调用Native层的nativePollOnce()函数,通过分析整理得出调用的主路线:nativePollOnce() -> NativeMessageQueue.pollOnce() -> Looper.pollOnce() -> Looper.pollInner()

    注意Java传入的mPtr变量,mPtr变量保存了一个NativeMessageQueue对象,从而使得MessageQueue成为Java层和Native层的连接桥梁,使得Java层与native层就可以相互处理消息了。

    从源码的过程得知,消息的处理都放在Looper.pollInner()函数中处理了,我们重点分析pollInner()函数。

    1、首先是参数timeoutMillis(从Java传入的nextPollTimeoutMillis)做相应的处理,而timeoutMillis有三状态:

    • 等于0,马上返回,也就是没有阻塞。
    • 小于0,一直阻塞,直接向消息队列中添加消息,才处理事件。
    • 大于0,时间差值是多少,那么就等待多久,时间到,才处理事件。

    2、调用epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis)获取到等待事件eventCount 。

    eventCount 也有三种状态:

    • 等于0,返回result =POLL_TIMEOUT,表示在超时之前,要准备的数据没有准备好,即为等待超时。
    • 小于0,如果errno == EINTR,返回result =POLL_WAKE,否则返回result = POLL_ERROR,发生了错误。
    • 大于0,所有数据都准备好了。如果返回result =POLL_WAKE,通过wake()方法唤醒的。如果返回result = POLL_CALLBACK,在一个或多个文件描述符被触发了。

    从eventCount 这三种情况,pollInner()函数返回的值就是pollOnce()函数的返回值,就是POLL_WAKE、POLL_TIMEOUT、POLL_ERROR、POLL_CALLBACK四种状态。

    3、获取到等待事件,首先处理eventCount 的事件,也就是处理传入epoll_wait()函数的等待事件。如果fd == mWakeEventFd,并且 epollEvents & EPOLLIN,说明有消息队列中有新的消息要处理,调用awoken()函数,它只把管道中内容读取出来,清空管道,方便下一次调用epoll_wait()函数时,再次阻塞等待。

    4、接着处理MessageEnvelopes事件,如果消息的处理时间小于当前时间,则将从消息列表中移除此消息,并且消费掉这条消息,最后更新下一个消息的处理时间。

    5、最后处理Responses事件,如果响应类型为POLL_CALLBACK,然后处理这个事件,如果callbackResult等于0,则移除文件描述符,最后也callback.clear()掉。

    Handler的唤醒

    有Handler阻塞,那么肯定有Handler唤醒,对吧,什么时候唤醒呢,由于一开始时,MessageQueue是没有Message的,从而进入了阻塞等待,如果向MessageQueue插入一条消息呢,是不是会唤醒啊。其实消息从消息队列中全部移除quit()时,可能需要调用nativeWake方法。那么我们从MessageQueue.enqueueMessage()函数入手吧。

    //MessageQueue.java
    boolean enqueueMessage(Message msg, long when) {
         //...省略部分代码
            if (needWake) {//调用Native的唤醒机制
                nativeWake(mPtr);
            }
        //...省略部分代码
        return true;
    }
    
    

    唤醒是有两种情况:

    • 当消息队列中没有消息时,Handler发送一条新消息到消息队列中,那么就会调用Native层的nativeWake(mPtr)函数。
    • 当消息队列的消息的时间,与当前时间的比值,就是需要等待的时间,这个会传入Native层,时间到则自动唤醒。

    nativeWake()函数

    我们从调用Native层的nativeWake(mPtr)函数,通过源码去分析它们的调用过程吧。

    //android_os_MessageQueue.cpp
    static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
        //获取到当前的消息队列
        NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
        nativeMessageQueue->wake();//调用NatvieMessageQueue的wake()函数
    }
    void NativeMessageQueue::wake() {
        mLooper->wake();//调用Looper的wake()函数
    }
    
    //Looper.cpp
    void Looper::wake() {
        uint64_t inc = 1;
        //向mWakeEventFd(唤醒文件描述符)写入字符
        ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
        if (nWrite != sizeof(uint64_t)) {
            if (errno != EAGAIN) {
            ALOGW("Could not write wake signal, errno=%d", errno);
            }
        }
    }
    
    

    看到了Nativen层的唤醒调用过程是不是很简单。最后直接调用了TEMP_FAILURE_RETRY函数,其他就是向管道中写入数据,管道监听到有数据写入就是唤醒Android应用程序主线程处理事件。关于Linux系统的epoll机制,管道,涉及到了Binder相关知识,就不展开讨论了。

    Handler的同步屏障

    什么是同步屏障

    同步屏障:阻碍同步消息,优先执行异步消息。

    为什么要设计同步屏障

    从上述代码的Handler构造函数可知,一般情况下,默认调用Handler(callback, false)构造函数,也是就是mAsynchronous = false,同时在Handler.enqueueMessage函数中msg.target已经赋值了当前的Handler对象,在MessageQueue.enqueueMessage中按执行时间顺序把Message插入到Message链表合适的位置。在调用MessageQueue.next函数获取Message时添加了synchronied锁,所以取消息的时候是互斥取消息,只能从头部取消息,也因为加消息是按照消息的执行的先后顺序进行。如果要优先立即执行某条Message时,按正常情况是要排队的,是没法做到立即执行,所以就引入了同步屏障。

    如何开启同步屏障

    我们通过MessageQueue.postSyncBarrier() 函数,是如何引入同步屏障的。源码如下:

    public int postSyncBarrier() {
        return postSyncBarrier(SystemClock.uptimeMillis());
    }
    
    private int postSyncBarrier(long when) {
        synchronized (this) {
            final int token = mNextBarrierToken++;
            final Message msg = Message.obtain();//从消息对象池中获取一条消息
            msg.markInUse();
            //将消息信息初始化赋值,注意这里并没有给target赋值,这是关键
            msg.when = when;
            msg.arg1 = token;
    
            Message prev = null;
            Message p = mMessages;
            if (when != 0) {
                //如果开启同步屏障的时间(假设记为T)T不为0,且当前的同步消息里有时间小于T,则prev也不为null 
                while (p != null && p.when <= when) {
                    prev = p;
                    p = p.next;
                }
            }
            //将 msg 按照时间顺序插入到 消息队列(链表)的合适位置 
            if (prev != null) { // invariant: p == prev.next
                msg.next = p;
                prev.next = msg;
            } else {
                msg.next = p;
                mMessages = msg;
            }
            return token;//返回一个序号,通过这个序号可以撤销屏障
        }
    }
    
    

    从上述开启同步屏障的源码中可以看出,Message 对象初始化时并没有给 target 赋值,也就是说向MessageQueue中插入一条target 为null标记的Message,相对于正常的enqueue操作,在Handler.enqueueMessage函数中Handler与msg.target绑定了 。同步屏障的Message特殊在于 target 为 null,并不会被消费,因为不会唤醒消息队列。

    在那里消费掉同步屏障的Message,上述代码:Looper.loop()函数中调用了MessageQueue.next()函数,那么我们再看一次next函数源码:

    Message next() {
        //...省略部分代码
        for (;;) {
            //...省略部分代码
            nativePollOnce(ptr, nextPollTimeoutMillis);//根据nextPollTimeoutMillis是阻塞还唤醒
            synchronized (this) {
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                //target为null 说明这是一条同步屏障消息
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());//如果是异步,则获取并消费这条消息
                }
            }
            //...省略部分代码   
        }
    }
    
    

    从next函数中可以看出,MessageQueue中的msg.target为null说明开启了同步屏障,同时是异步,那么Message则会优先处理,这就是同步屏障的作用(过滤和优先作用)。

    我们来一形象图,加深印象:

    image

    发送异步Message

    发送同步和异步Message都是在Handler的几个构造函数,可以传入async标志为true,这样构造的Handler发送的消息就是异步消息。

    public Handler(boolean async) {
        this(null, async);
    }
     public Handler(Callback callback, boolean async) {
         //...省略代码
     }
     public Handler(Looper looper, Callback callback, boolean async) {
         //...省略代码
     }
    //最终调用enqueueMessage函数,把消息插入到消息队列中
    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        //this就是Handler Message中持有一个Handler
        //为发送消息出队列交给handler处理埋下伏笔。
        msg.target = this;
        if (mAsynchronous) {//是否是异步信息
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);//调用消息队列的入队函数
    }
    
    

    当然我们也可以在创建Message时,调用Message.setAsynchronous(true)将消息设为异步。
    发送异步消息和发送同步消息一样,唯一区别就在于Asynchronous的设置即可。

    移除同步屏障

    有启动同步屏障,那么就有移除同步屏障,我们看MessageQueue.removeSyncBarrier()函数源码是怎么移除同步屏障的:

    public void removeSyncBarrier(int token) {
        synchronized (this) {
            Message prev = null;
            Message p = mMessages;
            //找到token对应的屏障
            while (p != null && (p.target != null || p.arg1 != token)) {
                prev = p;
                p = p.next;
            }
            final boolean needWake;
            //从消息链表中移除
            if (prev != null) {
                prev.next = p.next;
                needWake = false;
            } else {
                mMessages = p.next;
                needWake = mMessages == null || mMessages.target != null;
            }
            //回收Message到对象池中。
            p.recycleUnchecked();
            if (needWake && !mQuitting) {
                nativeWake(mPtr);//唤醒消息队列
            }
        }
    
    

    在启动同步屏障时,已经记录了token,然后通过token对应的屏障,从消息链表中移除,回收Message到对象池中。

    同步消息屏障的应用场景

    同步屏障在系统源码中有哪些使用场景呢?我们日常开发中也很少用到同步消息屏障,涉及到同步消息屏障一般都是Android系统中使用得最多了,比如我们的UI更新相关的消息,就是利用同步屏障发送异步消息,则会优先处理,从而达到马上刷新UI。既然知道了Android系统中刷新UI使用了异步消息,那么我们看看View的更新,draw()、requestLayout()、invalidate() 等函数都调用了。

    我们从View的绘制流程起始,View通过ViewRootImpl来绘制,ViewRootImpl调用到requestLayout()来完成View的绘制操作:知道这个流程,我们看ViewRootImpl.requestLayout()函数源码:

    @Override
    public void requestLayout() {
        if (!mHandlingLayoutInLayoutRequest) {
            checkThread();//检查是否在主线程
            mLayoutRequested = true;//mLayoutRequested 是否measure和layout布局。
            //重要函数
            scheduleTraversals();
        }
    }
    
    void scheduleTraversals() {
        if (!mTraversalScheduled) {
            mTraversalScheduled = true;
            //设置同步障碍,确保mTraversalRunnable优先被执行
            mTraversalBarrier = mHandler.getLooper().getQueue().postSyncBarrier();
            //内部通过Handler发送了一个异步消息
            mChoreographer.postCallback(
                    Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
            if (!mUnbufferedInputDispatch) {
                scheduleConsumeBatchedInput();
            }
            notifyRendererOfFramePending();
            pokeDrawLockIfNeeded();
        }
    }
    
    //移除同步屏障
    void unscheduleTraversals() {
        if (mTraversalScheduled) {
            mTraversalScheduled = false;
            mHandler.getLooper().getQueue().removeSyncBarrier(mTraversalBarrier);
            mChoreographer.removeCallbacks(
                Choreographer.CALLBACK_TRAVERSAL, mTraversalRunnable, null);
        }
    }
    
    

    调用 Handler.getLooper().getQueue().postSyncBarrier() 并设置同步屏障消息。

    最终调用Choreographer.postCallbackDelayedInternal()函数,在其函数中设置Message.setAsynchronous(true) 时 ,也就发送异步消息。

    private void postCallbackDelayedInternal(int callbackType,
                                             Object action, Object token, long delayMillis) {
        synchronized (mLock) {
            final long now = SystemClock.uptimeMillis();
            final long dueTime = now + delayMillis;
            mCallbackQueues[callbackType].addCallbackLocked(dueTime, action, token);
    
            if (dueTime <= now) {
                scheduleFrameLocked(now);
            } else {
                Message msg = mHandler.obtainMessage(MSG_DO_SCHEDULE_CALLBACK, action);//获取消息对象
                msg.arg1 = callbackType;
                msg.setAsynchronous(true);//设置为异步
                mHandler.sendMessageAtTime(msg, dueTime);//发送异步消息
            }
        }
    }
    
    

    调用Handler.getLooper().getQueue().removeSyncBarrier()函数移除同步屏障。最终调用Choreographer.removeCallbacksInternal()函数移除消息。

    private void removeCallbacksInternal(int callbackType, Object action, Object token) {
        synchronized (mLock) {
            mCallbackQueues[callbackType].removeCallbacksLocked(action, token);
            if (action != null && token == null) {
                //移除消息
                mHandler.removeMessages(MSG_DO_SCHEDULE_CALLBACK, action);
            }
        }
    }
    
    

    总结

    • 线程切换,在子线程发送Message,Message对象是存放在堆内存,获取到Message时,主线程dispatchMessage分发处理这条消息。
    • ThreadLocal的作用就是线程隔离,每个线程都提供一个变量的副本,使得每一个线程在同一时间内访问到的不同对象。
    • 当MessageQueue中没有Message时,触发Native层进入阻塞状态;或者MessageQueue中的Message更新时间没有到时,也进入了阻塞状态,阻塞时长就是它的更新时间。
    • MessageQueue中没有Message时,向MessageQueue中添加Message时,触发Native层的唤醒机制;MessageQueue中的Message更新时间到时,也触发唤醒。
    • Handler的同步屏障,就是优化处理系统的Message,并且Message是异步的。比如:Android应用程序出现ANR,系统发起同步屏障,发送一条异步消息,界面上优先处理这条消息了,然后系统弹出一个ANR消息对话框。

    相关文章

      网友评论

          本文标题:由浅入深全面分析Handler机制原理之源码<难点>

          本文链接:https://www.haomeiwen.com/subject/jlkvyktx.html