美文网首页
Android-MessageQueue

Android-MessageQueue

作者: 有腹肌的豌豆Z | 来源:发表于2020-09-25 08:46 被阅读0次

    一、MessageQueue简介

    • MessageQueue即消息队列,这个消息队列和Message消息对象对象池里面的消息对象池(单链表)可不是同一个东西。
    • MessageQueue是一个消息队列,Handler将Message发送到消息队列中,消息队列会按照一定的规则取出要执行的Message。
    • 需要注意的是Java层的MessageQueue负责处理Java的消息,native也有一个MessageQueue负责处理native的消息。
    • 这里有必要提一下MessageQueue的数据结构,是一个单向链表,Message对象有个next字段保存列表中的下一个,MessageQueue中的mMessages保存链表的第一个元素。
    • 用于将消息插入和读取
    • 通过一个单链表的数据结构,维护消息列表

    二、MessageQueue类注释

    /**
     * Low-level class holding the list of messages to be dispatched by a
     * {@link Looper}.  Messages are not added directly to a MessageQueue,
     * but rather through {@link Handler} objects associated with the Looper.
     *
     * <p>You can retrieve the MessageQueue for the current thread with
     * {@link Looper#myQueue() Looper.myQueue()}.
     */
    public final class MessageQueue {
        ....
    }
    

    它是一个被Looper分发、低等级的持有Message集合的类。Message并不是直接加到MessageQueue的,而是通过Handler对象和Looper关联到一起。
    我们可以通过Looper.myQueue()方法来检索当前线程的MessageQueue
    它是一个低等级的持有Messages集合的类,被Looper分发。Messages并不是直接加到MessageQueue的,而是通过Handler对象和Looper关联到一起。我们可以通过Looper.myQueue()方法来检索当前线程的。

    三、MessageQueue成员变量

        // True if the message queue can be quit.
        //用于标示消息队列是否可以被关闭,主线程的消息队列不可关闭
        private final boolean mQuitAllowed;
    
        @SuppressWarnings("unused")
        // 该变量用于保存native代码中的MessageQueue的指针
        private long mPtr; // used by native code
    
        //在MessageQueue中,所有的Message是以链表的形式组织在一起的,该变量保存了链表的第一个元素,也可以说它就是链表的本身
        Message mMessages;
    
        //当Handler线程处于空闲状态的时候(MessageQueue没有其他Message时),可以利用它来处理一些事物,该变量就是用于保存这些空闲时候要处理的事务
        private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
    
        // 注册FileDescriptor以及感兴趣的Events,例如文件输入、输出和错误,设置回调函数,最后
        // 调用nativeSetFileDescriptorEvent注册到C++层中,
        // 当产生相应事件时,由C++层调用Java的DispathEvents,激活相应的回调函数
        private SparseArray<FileDescriptorRecord> mFileDescriptorRecords;
    
         // 用于保存将要被执行的IdleHandler
        private IdleHandler[] mPendingIdleHandlers;
    
        //标示MessageQueue是否正在关闭。
        private boolean mQuitting;
    
        // Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
        // 标示 MessageQueue是否阻塞
        private boolean mBlocked;
    
        // The next barrier token.
        // Barriers are indicated by messages with a null target whose arg1 field carries the token.
        // 在MessageQueue里面有一个概念叫做障栅,它用于拦截同步的Message,阻止这些消息被执行,
        // 只有异步Message才会放行。障栅本身也是一个Message,只是它的target为null并且arg1用于区分不同的障栅,
         // 所以该变量就是用于不断累加生成不同的障栅。
        private int mNextBarrierToken;
    
    

    四、MessageQueue的构造函数

     MessageQueue(boolean quitAllowed) {
            mQuitAllowed = quitAllowed;
            mPtr = nativeInit();
        }
    

    MessageQueue只是有一个构造函数,该构造函数是包内可见的,其内部就两行代码,分别是设置了MessageQueue是否可以退出和native层代码的相关初始化。

    调用的地方Looper#Looper(boolean quitAllowed)
        private Looper(boolean quitAllowed) {
            mQueue = new MessageQueue(quitAllowed); // 消息队列
            mThread = Thread.currentThread();       // 当前线程
        }
    

    在Looper中创建了MessageQueue。


    MessageQueue处理消息

    消息入队 enqueueMessage()

    这个方法主要是用来处理发送消息的,当Handler通过自己enqueueMessage()将消息发送到这该函数中。该函数首先会判断判断是否msg.target有Handler的引用,消息会被按着时间顺序被添加到队列中。

    Handler#enqueueMessage()

        /**
         * 将Msg添加到消息队列中
         *
         * @param queue 这个就是消息队列 从Looper里面获取到的
         * @param msg    发送的具体消息
         * @param uptimeMillis 处理时间
         * @return
         */
        private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg, long uptimeMillis) {
            // Handler 保存进来了 looper的时候使用这个Handler 处理消息的
            // 为什么使用匿名内部类来创建Handler的方法会有内存泄漏的风险?
            msg.target = this;
            // 线程数据
            msg.workSourceUid = ThreadLocalWorkSource.getUid();
            // 异步
            if (mAsynchronous) {
                msg.setAsynchronous(true);
            }
            // 调用的是 Looper 的 Queen 的函数
            return queue.enqueueMessage(msg, uptimeMillis);
        }
    

    MessageQueue#enqueueMessage(Message msg, long when)

    
    boolean enqueueMessage(Message msg, long when) {
            // msg 必须有target也就是必须有handler
            if (msg.target == null) {
                throw new IllegalArgumentException("Message must have a target.");
            }
            if (msg.isInUse()) {
                throw new IllegalStateException(msg + " This message is already in use.");
            }
    
            // 锁定一下 
            synchronized (this) {
                // 标示MessageQueue是否正在关闭。
                if (mQuitting) {
                    IllegalStateException e = new IllegalStateException(
                            msg.target + " sending message to a Handler on a dead thread");
                    Log.w(TAG, e.getMessage(), e);
                    msg.recycle();
                    return false;
                }
    
                msg.markInUse();
                //when 表示这个消息执行的时间,队列是按照消息执行时间排序的
                msg.when = when;
                Message p = mMessages;
                boolean needWake;
                if (p == null || when == 0 || when < p.when) {
                    // 如果p为null则表示当前消息队列没有消息
                    // New head, wake up the event queue if blocked.
                    msg.next = p;
                    // 初始化头消息(改变了总是指向新的消息)
                    mMessages = msg;
                     // true代表无消息,阻塞线程,false代表有消息,没有阻塞线程
                    needWake = mBlocked;
                } else {
                    // Inserted within the middle of the queue.  Usually we don't have to wake
                    // up the event queue unless there is a barrier at the head of the queue
                    // and the message is the earliest asynchronous message in the queue.
                    needWake = mBlocked && p.target == null && msg.isAsynchronous();
                    Message prev;
                    // 将消息放到队列,消息是按照msg的when 排序
                    for (;;) {
                        prev = p;
                        p = p.next;
                        if (p == null || when < p.when) {
                            break;
                        }
                        if (needWake && p.isAsynchronous()) {
                            needWake = false;
                        }
                    }
                    msg.next = p; // invariant: p == prev.next
                    prev.next = msg;
                }
    
                // We can assume mPtr != 0 because mQuitting is false.
                if (needWake) {
                    nativeWake(mPtr);
                }
            }
            return true;
        }
    
    消息轮询 next()

    Looper.loop() 调用 Message msg = queue.next();

    public static void loop() {
           
           ....
    
            // 进入loop的主循环方法
            for (; ; ) {
                Message msg = queue.next(); // might block 可能会阻塞,因为next()方法可能会无限循环
    
                ....
    
                // 回收
                msg.recycleUnchecked();
            }
    }
    
    

    next()

     @UnsupportedAppUsage
        Message next() {
    
            //nextPollTimeoutMillis 表示nativePollOnce方法需要等待的时间
            //nextPollTimeoutMillis=-1表示一直阻塞切不会超时
            //nextPollTimeoutMillis>0 表示阻塞时长,可以理解为延迟多长时间发送消息
    
            // Return here if the message loop has already quit and been disposed.
            // This can happen if the application tries to restart a looper after quit
            // which is not supported.
            final long ptr = mPtr;
            if (ptr == 0) {
                return null;
            }
    
            // //表示不会阻塞,立即执行
            int pendingIdleHandlerCount = -1; // -1 only during first iteration
            int nextPollTimeoutMillis = 0;
            
            for (;;) {
                if (nextPollTimeoutMillis != 0) {
                    Binder.flushPendingCommands();
                }
    
                //native的方法,在没有消息的时候会阻塞管道读取端,只有nativePollOnce返回之后才能往下执行
                nativePollOnce(ptr, nextPollTimeoutMillis);
    
                synchronized (this) {
                    //从开机到现在的毫秒数
                    // Try to retrieve the next message.  Return if found.
                    final long now = SystemClock.uptimeMillis();
                    
                    Message prevMsg = null;
                    Message msg = mMessages;
                    if (msg != null && msg.target == null) {
                        // 找不是异步而且msg.target不为空的message
                        // Stalled by a barrier.  Find the next asynchronous message in the queue.
                        do {
                            prevMsg = msg;
                            msg = msg.next;
                        } while (msg != null && !msg.isAsynchronous());
                    }
                    if (msg != null) {
                        //开机到现在的毫秒数如果小于msg.when则代表还未到发送消息的时间
                        if (now < msg.when) {
                            // Next message is not ready.  Set a timeout to wake up when it is ready.
                            // 虽然有消息,但是还没有到运行的时候
                            //计算还有等待多久,并赋值给nextPollTimeoutMillis
                            nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                        } else {
                            //代表当前没有阻塞
                            // Got a message.
                            mBlocked = false;
    
                            // 获取msg并且删除该节点 
                            if (prevMsg != null) {
                                prevMsg.next = msg.next;
                            } else {
                                mMessages = msg.next;
                            }
                            msg.next = null;
                            if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                            msg.markInUse();
    
                            //返回拿到的消息
                            return msg;
                        }
                    } else {
                        //没有消息,nextPollTimeoutMillis复位
                        // No more messages.
                        nextPollTimeoutMillis = -1;
                    }
    
                    // Process the quit message now that all pending messages have been handled.
                    if (mQuitting) {
                        dispose();
                        return null;
                    }
    
                    // If first time idle, then get the number of idlers to run.
                    // Idle handles only run if the queue is empty or if the first message
                    // in the queue (possibly a barrier) is due to be handled in the future.
                    if (pendingIdleHandlerCount < 0
                            && (mMessages == null || now < mMessages.when)) {
                        pendingIdleHandlerCount = mIdleHandlers.size();
                    }
                    if (pendingIdleHandlerCount <= 0) {
                        // No idle handlers to run.  Loop and wait some more.
                        mBlocked = true;
                        continue;
                    }
    
                    if (mPendingIdleHandlers == null) {
                        mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                    }
                    mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
                }
    
                // Run the idle handlers.
                // We only ever reach this code block during the first iteration.
                for (int i = 0; i < pendingIdleHandlerCount; i++) {
                    final IdleHandler idler = mPendingIdleHandlers[i];
                    mPendingIdleHandlers[i] = null; // release the reference to the handler
    
                    boolean keep = false;
                    try {
                        keep = idler.queueIdle();
                    } catch (Throwable t) {
                        Log.wtf(TAG, "IdleHandler threw exception", t);
                    }
    
                    if (!keep) {
                        synchronized (this) {
                            mIdleHandlers.remove(idler);
                        }
                    }
                }
    
                // Reset the idle handler count to 0 so we do not run them again.
                pendingIdleHandlerCount = 0;
    
                // While calling an idle handler, a new message could have been delivered
                // so go back and look again for a pending message without waiting.
                nextPollTimeoutMillis = 0;
            }
        }
    
    

    上面的next()部分代码中有部分是native相关的知识,这里不做说明,我们只了解next执行的思路。next()方法中有一个无限循环,里面调用了阻塞方法,如果有消息或者等待延迟的时间到了才不会阻塞,系统将继续执行,在获取到消息后会将消息赋值给新的变量,并将这个消息从单链表中删除。

    Looper.loop()里面的死循环不会死机是因为MessageQueue.next()里面调用了native层的函数阻塞了程序的循环。

    移除消息 removeMessages()

    就是将消息从链表移除,同时将移除的消息添加到消息池,提供循环复用。
    采用了两个while循环,第一个循环是从队头开始,移除符合条件的消息,第二个循环是从头部移除完连续的满足条件的消息之后,再从队列后面继续查询是否有满足条件的消息需要被移除。

    
    
    消息退出quit()方法

    Looper调用 Looper.quit()

    public void quit() {
            mQueue.quit(false);
    }
    
    public void quitSafely() {
            mQueue.quit(true);
    }
    

    MessageQueue.quite(boolean safe)

    void quit(boolean safe) {
            if (!mQuitAllowed) {
                throw new IllegalStateException("Main thread not allowed to quit.");
            }
    
            synchronized (this) {
                if (mQuitting) {
                    return;
                }
                mQuitting = true;
    
                if (safe) {
                    //移除延迟消息 Looper.quitSafely()调用
                    removeAllFutureMessagesLocked();
                } else {
                    //移除全部消息 Looper.quit()调用
                    removeAllMessagesLocked();
                }
    
                // We can assume mPtr != 0 because mQuitting was previously false.
                nativeWake(mPtr);
            }
        }
    

    removeAllMessagesLocked()

    // 该方法是清楚消息列表的全部消息
    private void removeAllMessagesLocked() {
        Message p = mMessages; // mMessages 为消息队列的表头的消息
        // 清除列表中的所有消息,recycleUnchecked()为进行回收
        while (p != null) { 
            Message n = p.next;
            p.recycleUnchecked();
         p = n;
        }
        // mMessages 为 null 时Looper 的 for 循环就会结束
        mMessages = null;
    }
    
    

    removeAllFutureMessagesLocked()

    // 该方法值清除出延迟的消息,其他非延迟消息,依旧让他执行。
    private void removeAllFutureMessagesLocked() {
        //获取从开机到现在的时间
        final long now = SystemClock.uptimeMillis();
        Message p = mMessages;
        if (p != null) {
            //如果是延迟消息,那么整个消息队列都会清楚!
            if (p.when > now) {
                removeAllMessagesLocked();
            } else {
                Message n;
                for (;;) {
                    n = p.next;
                    if (n == null) {
                        return;
                    }
                    // 如果当前消息是延迟消息,跳出循环
                    if (n.when > now) {
                        break;
                    }
                    p = n;
                }
                p.next = null;
                //剩下的消息在do while中
                do {
                    p = n;
                    n = p.next;
                    p.recycleUnchecked();
                } while (n != null);
            }
        }
    }
    
    
    同步消息屏障
         public int postSyncBarrier() {
            return postSyncBarrier(SystemClock.uptimeMillis());
        }
    
         private int postSyncBarrier(long when) {
            // Enqueue a new sync barrier token.
            // We don't need to wake the queue because the purpose of a barrier is to stall it.
            synchronized (this) {
                final int token = mNextBarrierToken++;
                final Message msg = Message.obtain();
                msg.markInUse();
                msg.when = when;
                msg.arg1 = token;
    
                Message prev = null;
                Message p = mMessages;
                if (when != 0) {
                    while (p != null && p.when <= when) {
                        prev = p;
                        p = p.next;
                    }
                }
                if (prev != null) { // invariant: p == prev.next
                    msg.next = p;
                    prev.next = msg;
                } else {
                    msg.next = p;
                    mMessages = msg;
                }
                return token;
            }
        }
    
    

    相关文章

      网友评论

          本文标题:Android-MessageQueue

          本文链接:https://www.haomeiwen.com/subject/cqkzyktx.html