美文网首页
Handler机制源码之路(二)MessageQueue篇

Handler机制源码之路(二)MessageQueue篇

作者: 烤地瓜次不次 | 来源:发表于2019-02-18 14:34 被阅读0次

    Handler 基础了解

    1. Handler 是干嘛的

    说白了,Handler 就是 Android 提供的 线程之间交互 的一种方式,并在系统层面同样大量使用。

    2. Handler 的工作流程是怎样的

    当每个线程创建时,可选择 为该线程创建自己的循环队列 (Looper + MessageQueue) ,当 [线程B] 想发送消息给 [线程A] 时,只需要在 [线程B] 把消息推送到 [线程A]MessageQueue 中,[线程A] 就可以用 Looper 提取 MessageQueue 中的 Message

    3. 系统有哪些地方使用了 Handler

    • CountDownTimer
    • Messenger
    • AsyncTask
    • IntentService
    • View
    • LocalBroadcast

    4. Handler 机制所使用的主要类

    • Message
    • MessageQueue
    • Looper
    • Handler

    5. Handler 的基础使用方法

    // 1.在需要接受数据的线程创建Handler
    Handler handler = new Handler(){
        @Override
       public void handleMessage(Message msg) {
            // 4.这里接收发送的数据
       }
    }
    // 2.在需要发送数据的线程创建Message
    Message message = Message.obtain();__
    // 3.调用接收线程的handler发送消息
    handler.sendMessage(message);
    

    主要类源码分析

    1. Message

    Handler机制源码之路(一)Message篇

    2. MessageQueue

    MessageQueue 作为存放 Message 的容器。Handler机制中 最重要 的实现部分。

    2.1 继承关系

    // 无继承/无实现
    public final class MessageQueue {
    

    2.2 变量

    // 如果消息队列可以退出,则该值为true。MainLooper为不可退出,其他Looper默认可退出
    private final boolean mQuitAllowed;
    private boolean mQuitting;// 正在退出的标识位
    
    @SuppressWarnings("unused")
    private long mPtr; // used by native code C++对象指针
    
    Message mMessages; // 消息链表(有序,时间顺序)
    // 是否唤醒的锁标识,true则表示native层正在等待唤醒
    private boolean mBlocked;
    // 同步消息屏障
    private int mNextBarrierToken;
    
    // IdelHandler 空闲消息
    private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
    private IdleHandler[] mPendingIdleHandlers;
    
    private SparseArray<FileDescriptorRecord> mFileDescriptorRecords;
    
    

    2.3 本地方法

    private native static long nativeInit();// 本地创建循环
    private native static void nativeDestroy(long ptr);// 本地销毁循环
    private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/// 本地拉去数据
    private native static void nativeWake(long ptr);// 本地唤醒
    private native static boolean nativeIsPolling(long ptr);// 本地判断是否在拉去
    private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
    

    native 方法具体实现的是什么功能呢,其实他主要是在 C++ 层面实现了一个 消息阻塞 的功能。内部是使用了Linux的 多路复用/epoll 实现的。具体可以参考这篇博客对我来说可能需要以后再找时间攻克这个C++的源码

    2.4 构造方法

    该方法在 Looper 的 perpare() 中被调用。

    // 可以看到,MessageQueue的构造方法是一个包权限
    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit();
    }
    

    通过对构造方法的调用位置查看可知,MessageQueue 跟随 Looper 的创建而创建。在插入时,使用 自身对象锁,保证消息的处理顺序。

    2.5 回收方法

    // finalize是Java提供的回收方法,当对象被GC回收时调用,这里用来做一些native释放
    @Override
    protected void finalize() throws Throwable {
        try {
            dispose();
        } finally {
            super.finalize();
        }
    }
    
    // Disposes of the underlying message queue.
    // Must only be called on the looper thread or the finalizer.
    // 这个注释说必须在回收的时候或者looper线程才能被调用
    private void dispose() {
        if (mPtr != 0) {
            nativeDestroy(mPtr);
            mPtr = 0;
        }
    }
    

    2.6 MessageQueue 的 hasMessage() 方法

    这几个方法主要是根据传值的不同,甄别不同的消息进行判断

    // 判断消息队列中是否含有指定what和object的消息
    boolean hasMessages(Handler h, int what, Object object) {
        if (h == null) {
            return false;
        }
    
        synchronized (this) {
            Message p = mMessages;
            while (p != null) {
                if (p.target == h && p.what == what && (object == null || p.obj == object)) {
                    return true;
                }
                p = p.next;
            }
            return false;
        }
    }
    // 判断消息队列中是否含有指定runnable和object的消息
    boolean hasMessages(Handler h, Runnable r, Object object) {
        if (h == null) {
            return false;
        }
    
        synchronized (this) {
            Message p = mMessages;
            while (p != null) {
                if (p.target == h && p.callback == r && (object == null || p.obj == object)) {
                    return true;
                }
                p = p.next;
            }
            return false;
        }
    }
    // 判断消息队列中是否含有指定Handler的消息
    boolean hasMessages(Handler h) {
        if (h == null) {
            return false;
        }
    
        synchronized (this) {
            Message p = mMessages;
            while (p != null) {
                if (p.target == h) {
                    return true;
                }
                p = p.next;
            }
            return false;
        }
    }
    

    2.7 MessageQueue 的 next() 方法

    该方法在 Looper 的 loop() 方法中被 循环调用

    // 依然是包权限
    Message next() {
        // 当loop已经结束的时候,直接返回null
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }
    
        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
            // 将当前线程中挂起的所有绑定器命令刷新到内核驱动程序。
            // 这对于在执行可能会阻塞很长时间的操作之前进行调用很有用,
            // 以确保已释放任何挂起的对象引用,以防止进程将对象保留的时间超过需要的时间。
                Binder.flushPendingCommands();
            }
    
            nativePollOnce(ptr, nextPollTimeoutMillis);
    
            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    // 如果有同步消息屏障,找到链表中第一个异步的Message
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                    if (now < msg.when) {// 如果消息没到,则取最小的时间休眠,然后进入IdleHandler逻辑
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // 返回链表头的Message
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        msg.markInUse();
                        return msg;
                    }
                } else {
                    // No more messages.
                    nextPollTimeoutMillis = -1;
                }
    
                // 如果需要关闭队列
                if (mQuitting) {
                    dispose();
                    return null;
                };
                /// 下面的代码在 2.13 IdelHandler 机制 中分析
                /// ...
        }
    }
    

    2.8 MessageQueue 的 enqueueMessage() 方法

    该方法在 Handler 中被调用,作用是向消息队列中添加 Message。

    // 可以看到,也是包权限
    boolean enqueueMessage(Message msg, long when) {
        // 必须指定Handler
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }
        // 判断Message是否被使用
        if (msg.isInUse()) {
            throw new IllegalStateException(msg + " This message is already in use.");
        }
    
        synchronized (this) {
            // 判断线程是否退出了
            if (mQuitting) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }
            // 标记消息为被使用
            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            if (p == null // 如果消息队列中没有消息
                    || when == 0 // 或需要立马执行的消息
                    || when < p.when) {// 或该消息需要执行的时间早于链表头所要执行的时间
                // 将该消息至于链表头,同时通知native函数需要唤醒
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                // 如果是把数据加到链表的中间,通常我们不需要主动的通知唤醒,除非
                // 1.在消息头存在同步消息屏障,即该消息的handler为空
                // 2.这条消息需要异步执行
                
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                // 从消息头开始遍历
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        // 当到达消息尾或发现当前消息的时间早于遍历到的消息时间时
                        break;
                    }
                        // 如发现链表中有早于该消息的异步消息时,不用通知唤醒
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }
    
            // 因为 mQuitting是false, 所以我们可以认为mPtr != 0
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }
    

    该方法将 Message 根据时间添加到队列中,并根据逻辑来判断是否需要调用 唤醒 方法。

    2.9 MessageQueue 的 removeMessage() 方法

    2.9.1 移除特定消息

    根据传递的参数不同来移除特定消息

    void removeMessages(Handler h, int what, Object object) {
        if (h == null) {
            return;
        }
    
        synchronized (this) {
            Message p = mMessages;
    
            // 先确定链表头
            while (p != null && p.target == h && p.what == what
                   && (object == null || p.obj == object)) {
                Message n = p.next;
                mMessages = n;
                p.recycleUnchecked();
                p = n;
            }
    
            // 再过滤其他位置
            while (p != null) {
                Message n = p.next;
                if (n != null) {
                    if (n.target == h && n.what == what
                        && (object == null || n.obj == object)) {
                        Message nn = n.next;
                        n.recycleUnchecked();
                        p.next = nn;
                        continue;
                    }
                }
                p = n;
            }
        }
    }
    
    void removeMessages(Handler h, Runnable r, Object object) {
        if (h == null || r == null) {
            return;
        }
    
        synchronized (this) {
            Message p = mMessages;
    
            // Remove all messages at front.
            while (p != null && p.target == h && p.callback == r
                   && (object == null || p.obj == object)) {
                Message n = p.next;
                mMessages = n;
                p.recycleUnchecked();
                p = n;
            }
    
            // Remove all messages after front.
            while (p != null) {
                Message n = p.next;
                if (n != null) {
                    if (n.target == h && n.callback == r
                        && (object == null || n.obj == object)) {
                        Message nn = n.next;
                        n.recycleUnchecked();
                        p.next = nn;
                        continue;
                    }
                }
                p = n;
            }
        }
    }
    
    void removeCallbacksAndMessages(Handler h, Object object) {
            if (h == null) {
                return;
            }
    
            synchronized (this) {
                Message p = mMessages;
    
                // Remove all messages at front.
                while (p != null && p.target == h
                        && (object == null || p.obj == object)) {
                    Message n = p.next;
                    mMessages = n;
                    p.recycleUnchecked();
                    p = n;
                }
    
                // Remove all messages after front.
                while (p != null) {
                    Message n = p.next;
                    if (n != null) {
                        if (n.target == h && (object == null || n.obj == object)) {
                            Message nn = n.next;
                            n.recycleUnchecked();
                            p.next = nn;
                            continue;
                        }
                    }
                    p = n;
                }
            }
        }
    
    2.9.2 移除所有消息
    // 遍历回收所有的Message
    private void removeAllMessagesLocked() {
        Message p = mMessages;
        while (p != null) {
            Message n = p.next;
            p.recycleUnchecked();
            p = n;
        }
        mMessages = null;
    }
    //使loop()方法在处理完消息队列中所有要传递的剩余消息后立即终止。但是,在循环终止之前,将不会传递将来具有到期时间的挂起延迟消息。
    private void removeAllFutureMessagesLocked() {
        final long now = SystemClock.uptimeMillis();
        Message p = mMessages;
        if (p != null) {
            if (p.when > now) {// 如果链表头的时间为未来时
                removeAllMessagesLocked();// 正常移除所有消息
            } else {
                Message n;
                for (;;) {
                    n = p.next;// 获取下一次需要传递的消息
                    if (n == null) {
                        return;
                    }
                    if (n.when > now) {// 下一次为未来时
                        break;
                    }
                    p = n;// 如果为过去时,则把它赋值给p
                }
                /// 循环结束则找到了所有过去时 的最后一个消息
                /// 此时,p为倒数第二个过去时消息,n为最后一个过去时消息
                p.next = null;
                /// 移除所有未来时的消息
                do {
                    p = n;
                    n = p.next;
                    p.recycleUnchecked();
                } while (n != null);
            }
        }
    }
    

    2.10 MessageQueue 的 quit() 方法

    // 参数 safe 表示是否要移除当前队列中已到时间但没有来得及处理的消息
    // true 表示不移除,false 表示全部移除
    void quit(boolean safe) {
        if (!mQuitAllowed) {
            throw new IllegalStateException("Main thread not allowed to quit.");
        }
    
        synchronized (this) {
            if (mQuitting) {
                return;
            }
            mQuitting = true;
    
            if (safe) {
                removeAllFutureMessagesLocked();
            } else {
                removeAllMessagesLocked();
            }
    
            // We can assume mPtr != 0 because mQuitting was previously false.
            nativeWake(mPtr);
        }
    }
    

    2.11 MessageQueue 的 isPolling() 方法

    该方法可用于准确的判断 queue 是否处于轮询中(是否被阻塞)

    public boolean isPolling() {
        synchronized (this) {
            return isPollingLocked();
        }
    }
    
    private boolean isPollingLocked() {
        // If the loop is quitting then it must not be idling.
        // We can assume mPtr != 0 when mQuitting is false.
        return !mQuitting && nativeIsPolling(mPtr);
    }
    

    2.12 MessageQueue 的 dump() 方法

    // 对当前MessageQueue状态的倾泻打印
    void dump(Printer pw, String prefix, Handler h) {
        synchronized (this) {
            long now = SystemClock.uptimeMillis();
            int n = 0;
            for (Message msg = mMessages; msg != null; msg = msg.next) {
                if (h == null || h == msg.target) {
                    pw.println(prefix + "Message " + n + ": " + msg.toString(now));
                }
                n++;
            }
            pw.println(prefix + "(Total messages: " + n + ", polling=" + isPollingLocked()
                    + ", quitting=" + mQuitting + ")");
        }
    }
    

    2.13 MessageQueue 的 同步消息屏障方法

    2.13.1 postSyncBarrier() 方法
    public int postSyncBarrier() {
        return postSyncBarrier(SystemClock.uptimeMillis());
    }
        // 向消息队列中插入一个handler为空的Message
    private int postSyncBarrier(long when) {
        // Enqueue a new sync barrier token.
        // We don't need to wake the queue because the purpose of a barrier is to stall it.
        synchronized (this) {
            final int token = mNextBarrierToken++;
            final Message msg = Message.obtain();
            msg.markInUse();
            msg.when = when;
            msg.arg1 = token;// 将arg1 属性复制为token
    
            Message prev = null;
            Message p = mMessages;
            if (when != 0) {
                while (p != null && p.when <= when) {
                    prev = p;
                    p = p.next;
                }
            }
            if (prev != null) { // invariant: p == prev.next
                msg.next = p;
                prev.next = msg;
            } else {
                msg.next = p;
                mMessages = msg;
            }
            return token;
        }
    }
    
    
    2.13.2 removeSyncBarrier() 方法
    // 根据arg1与token是否相等来移除同步消息屏障,重新nativeWake
    public void removeSyncBarrier(int token) {
        // Remove a sync barrier token from the queue.
        // If the queue is no longer stalled by a barrier then wake it.
        synchronized (this) {
            Message prev = null;
            Message p = mMessages;
            while (p != null && (p.target != null || p.arg1 != token)) {
                prev = p;
                p = p.next;
            }
            if (p == null) {
                throw new IllegalStateException("The specified message queue synchronization "
                        + " barrier token has not been posted or has already been removed.");
            }
            final boolean needWake;
            if (prev != null) {
                prev.next = p.next;
                needWake = false;
            } else {
                mMessages = p.next;
                needWake = mMessages == null || mMessages.target != null;
            }
            p.recycleUnchecked();
    
            // If the loop is quitting then it is already awake.
            // We can assume mPtr != 0 when mQuitting is false.
            if (needWake && !mQuitting) {
                nativeWake(mPtr);
            }
        }
    }
    
    

    2.14 MessageQueue 的 IdleHandler 机制

    当 MessageQueue 中的消息都被响应后,MessageQueue 进入 Idle 状态,此时可执行 mIdleHandlers 中的消息

    2.14.1 MessageQueue 的 IdleHandler 接口
    public static interface IdleHandler {
        // 返回false则从IdleHandlers中移除
       boolean queueIdle();
    }
    
    2.14.2 MessageQueue 的 isIdle() 方法
    // 消息列表为空或者消息列表要执行的时间没到
    public boolean isIdle() {
        synchronized (this) {
            final long now = SystemClock.uptimeMillis();
            return mMessages == null || now < mMessages.when;
        }
    }
    
    2.14.3 MessageQueue 的 addIdleHandler()/removeIdleHandler() 方法
    // 添加方法
    public void addIdleHandler(@NonNull IdleHandler handler) {
        if (handler == null) {
            throw new NullPointerException("Can't add a null IdleHandler");
        }
        synchronized (this) {
            mIdleHandlers.add(handler);
        }
    }
    // 移除方法
    public void removeIdleHandler(@NonNull IdleHandler handler) {
        synchronized (this) {
            mIdleHandlers.remove(handler);
        }
    }
    
    2.14.4 next() 方法中对 IdleHanlder 的使用

    在同步代码块中,将List转换为Array

    // If first time idle, then get the number of idlers to run.
    // Idle handles only run if the queue is empty or if the first message
    // in the queue (possibly a barrier) is due to be handled in the future.
    if (pendingIdleHandlerCount < 0
            && (mMessages == null || now < mMessages.when)) {
        pendingIdleHandlerCount = mIdleHandlers.size();
    }
    if (pendingIdleHandlerCount <= 0) {
        // No idle handlers to run.  Loop and wait some more.
        mBlocked = true;
        continue;
    }
    
    if (mPendingIdleHandlers == null) {
        mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];// 最大数量控制在4以内
    }
    mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
    

    然后循环执行 Array 中的 IdleHandler,执行后从 List 中移除

    // Run the idle handlers.
    // We only ever reach this code block during the first iteration.
    for (int i = 0; i < pendingIdleHandlerCount; i++) {
        final IdleHandler idler = mPendingIdleHandlers[i];
        mPendingIdleHandlers[i] = null; // release the reference to the handler
    
        boolean keep = false;
        try {
            keep = idler.queueIdle();
        } catch (Throwable t) {
            Log.wtf(TAG, "IdleHandler threw exception", t);
        }
    
        if (!keep) {
            synchronized (this) {
                mIdleHandlers.remove(idler);
            }
        }
    }
    
    // Reset the idle handler count to 0 so we do not run them again.
    pendingIdleHandlerCount = 0;
    
    // While calling an idle handler, a new message could have been delivered
    // so go back and look again for a pending message without waiting.
    nextPollTimeoutMillis = 0;
    

    2.15 MessageQueue 的 FileDescriptor 机制

    暂不知道这个是什么意思,是写日志或是?

    2.16 总结

    至此,MessageQueue 类的900多行代码,除了 FileDescriptor 部分就看完。这个类在整个 Handler 机制中属于最核心的处理部分。提供了对整个链表消息队列的增删查以及管理操作。最主要的:休眠唤醒采用了 native 代码调用 linux 部分实现。同时也提供了不少细节上的小用法,比如:同步消息屏障 ,IdleHandler 等。

    其实接下来的 Looper 和 Handler 就比较简单了,就是基于 MessageQueue 对上层做出的封装。只要理解了 MessageQueue 的工作原理,到了 Handler 和 Looper 的源码部分,更要关注的是整个Handler机制是如何在整个Android系统中使用的。

    3. Looper

    Handler机制源码之路(三)Looper篇

    4. Handler

    Handler机制源码之路(四)Handler篇

    Handler应用类拓展阅读

    1.CountDownTimer

    Handler机制应用之CountDownTimer篇

    相关文章

      网友评论

          本文标题:Handler机制源码之路(二)MessageQueue篇

          本文链接:https://www.haomeiwen.com/subject/irpaeqtx.html