美文网首页
MessageQueue源码详解

MessageQueue源码详解

作者: 断了谁的弦 | 来源:发表于2019-10-25 18:17 被阅读0次

    构造函数

        MessageQueue(boolean quitAllowed) {
            mQuitAllowed = quitAllowed;
            mPtr = nativeInit();
        }
    

    参数表示是否允许退出,如果允许退出则可以调用quit方法来清空所有的Message,并且退出Looper。在Looper的prepareMainLooper()方法中赋值给MessageQueue构造函数的参数为false,因为主线程是不允许退出的,否则程序就终止了。

    插入消息

        boolean enqueueMessage(Message msg, long when) {
            //检查target是否为null,target就是发送这个msg的Handler,在取消息的时候会调用msg.target.dispatchMessage(msg)来处理这个消息。
            if (msg.target == null) {
                throw new IllegalArgumentException("Message must have a target.");
            }
            
            //判断该消息是否已经标记了在使用,如果是则抛出异常。因为标志为在使用之后就会把该消息插入消息队列中,而且消息队列是单链表的数据结构,如果重复使用的话会导致单链表出现环的情况,遍历的时候会导致死循环。
            if (msg.isInUse()) {
                throw new IllegalStateException(msg + " This message is already in use.");
            }
    
            synchronized (this) {
                //判断该消息队列是否已经停止了,如果是则不再让新的消息入队。
                if (mQuitting) {
                    IllegalStateException e = new IllegalStateException(
                            msg.target + " sending message to a Handler on a dead thread");
                    Log.w(TAG, e.getMessage(), e);
                    msg.recycle();
                    return false;
                }
    
                //把该消息标志为正在使用,并赋值处理的时间
                msg.markInUse();
                msg.when = when;
                Message p = mMessages;
                boolean needWake;
                if (p == null || when == 0 || when < p.when) {
                    //如果消息队列没有消息,该消息的处理时间为0或者该消息的处理时间早于消息队列第一个消息的处理时间,则把该消息当做新的头部插入消息队列。并判断是否需要唤醒消息队列,如果该消息队列正在阻塞则唤醒。
                    msg.next = p;
                    mMessages = msg;
                    needWake = mBlocked;
                } else {
                    //如果消息队列正在阻塞而且第一条消息是一个消息屏障而新消息是异步消息则需要唤醒消息队列。因为在取消息的时候如果遇到消息屏障则消息后面的普通消息都不会处理,只会处理异步消息,如果没有异步消息则消息队列会阻塞。所以这里插入了一个异步消息则需要把消息队列唤醒。
                    needWake = mBlocked && p.target == null && msg.isAsynchronous();
                    Message prev;
                    for (;;) {
                        prev = p;
                        p = p.next;
                        if (p == null || when < p.when) {
                            //遍历到队尾或者找到处理时间比新消息晚的则结束循环,这样就保证了队列的消息是按照处理时间从小到大排序的。
                            break;
                        }
                        if (needWake && p.isAsynchronous()) {
                            //如果在队列中有比新消息处理时间早而且是异步消息的,则不需要唤醒,因为这时消息队列正在阻塞等这个异步消息的处理时间到。
                            needWake = false;
                        }
                    }
                    //把新消息插入消息队列
                    msg.next = p; 
                    prev.next = msg;
                }
    
                // We can assume mPtr != 0 because mQuitting is false.
                if (needWake) {
                    //如果需要则唤醒消息队列
                    nativeWake(mPtr);
                }
            }
            return true;
        }
    

    该方法是一个包限定的方法,在Handler的sendMessage系列方法里面最终都是通过调用这个方法来把Message插入消息队列的。插入的Message根据when从小到大在队列中排好序。

    获取消息

        Message next() {
            //当停止消息循环的时候会把mPtr赋值为0,此时直接返回null。
            final long ptr = mPtr;
            if (ptr == 0) {
                return null;
            }
    
            int pendingIdleHandlerCount = -1; // -1 only during first iteration
            int nextPollTimeoutMillis = 0;
            for (;;) {
                if (nextPollTimeoutMillis != 0) {
                    Binder.flushPendingCommands();
                }
    
                //阻塞nextPollTimeoutMillis的时间
                nativePollOnce(ptr, nextPollTimeoutMillis);
    
                synchronized (this) {
                    // Try to retrieve the next message.  Return if found.
                    final long now = SystemClock.uptimeMillis();
                    Message prevMsg = null;
                    Message msg = mMessages;
                    if (msg != null && msg.target == null) {
                        //如果消息队列的第一条消息是一个同步屏障,则寻找队列中的异步消息。
                        do {
                            prevMsg = msg;
                            msg = msg.next;
                        } while (msg != null && !msg.isAsynchronous());
                    }
                    
                    if (msg != null) {
                        if (now < msg.when) {
                            //还没到找到的消息的处理时间,则计算出还有多久才到处理时间,则下一次循环的时候就回到上面的阻塞了。
                            nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                        } else {
                            // 把消息从消息队列中删除,然后返回该消息。
                            mBlocked = false;
                            if (prevMsg != null) {
                                prevMsg.next = msg.next;
                            } else {
                                mMessages = msg.next;
                            }
                            msg.next = null;
                            if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                            msg.markInUse();
                            return msg;
                        }
                    } else {
                        //没有消息则赋值为-1,接着会一直阻塞直到被唤醒。
                        nextPollTimeoutMillis = -1;
                    }
    
                    //如果消息队列正在停止,则调用dispose(),里面会调用native方法销毁,而且会把mPtr赋值为0,所以在next()方法一开始的时候就会直接返回null了。
                    if (mQuitting) {
                        dispose();
                        return null;
                    }
    
                    //执行到这里则代表上面没有获取到能执行的消息,则此时消息队列是处于空闲状态的,在消息队列进入阻塞时进行一些空闲的处理。
                    //需要注意的是只有在第一次的时候才会进行处理,因为下面会把pendingIdleHandlerCount赋值为0,则会进入continue进行下一次循环。
                    if (pendingIdleHandlerCount < 0
                            && (mMessages == null || now < mMessages.when)) {
                        pendingIdleHandlerCount = mIdleHandlers.size();
                    }
                    if (pendingIdleHandlerCount <= 0) {
                        //如果没有要处理的IdleHandler则标志阻塞
                        mBlocked = true;
                        continue;
                    }
    
                    if (mPendingIdleHandlers == null) {
                        mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                    }
                    mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
                }
     
                // 运行IdleHandler。
                for (int i = 0; i < pendingIdleHandlerCount; i++) {
                    final IdleHandler idler = mPendingIdleHandlers[i];
                    mPendingIdleHandlers[i] = null; // release the reference to the handler
    
                    boolean keep = false;
                    try {
                        //在ActivityThread中有一个GcIdler,里面就是执行了doGcIfNeeded()方法。方法的返回值代表是否要保留这个IdleHandler,如果返回false则会从mIdleHandlers中删除。
                        keep = idler.queueIdle();
                    } catch (Throwable t) {
                        Log.wtf(TAG, "IdleHandler threw exception", t);
                    }
    
                    if (!keep) {
                        synchronized (this) {
                            mIdleHandlers.remove(idler);
                        }
                    }
                }
    
                //赋值为0,只处理一次IdleHandler
                pendingIdleHandlerCount = 0;
    
                //当处理IdleHandler的时候有可能会有新消息入队,则重置阻塞时间为0,不再等待而是开始下一轮循环。
                nextPollTimeoutMillis = 0;
            }
        }
    

    插入同步屏障

        public int postSyncBarrier() {
            return postSyncBarrier(SystemClock.uptimeMillis());
        }
    
        //时间就是当前时间
        private int postSyncBarrier(long when) {
            // Enqueue a new sync barrier token.
            // We don't need to wake the queue because the purpose of a barrier is to stall it.
            synchronized (this) {
                final int token = mNextBarrierToken++;
                final Message msg = Message.obtain();
                msg.markInUse();
                msg.when = when;
                msg.arg1 = token;
    
                Message prev = null;
                Message p = mMessages;
                if (when != 0) {
                    //循环直到找到消息队列中消息的处理时间比同步屏障的处理消息晚的
                    //假设同步屏障的处理时间为25,消息队列的处理时间分别是10、20、30,则循环完后prev指向20,p指向30(实际的处理时间是开机到现在所经过的毫秒。
                    while (p != null && p.when <= when) {
                        prev = p;
                        p = p.next;
                    }
                }
                
                //把同步屏障插入消息队列
                if (prev != null) { // invariant: p == prev.next
                    //插入消息队列里面,像上面假设的话则是把25的next指向30,把20的next指向25,达到插入的效果,则最终消息队列的时间为10、20、25、30。
                    msg.next = p;
                    prev.next = msg;
                } else {
                    //prev为null,则消息队列为null或者第一个消息的处理时间比同步屏障晚,则把同步屏障插入消息队列的头部。
                    msg.next = p;
                    mMessages = msg;
                }
                return token;
            }
        }
    

    作为内存屏障的Message相当于一个标记,当MessageQueue取消息的时候,如果发现了内存屏障,则只会处理内存屏障后面的异步消息(isAsynchronous()方法返回true)。其他消息是还会放在消息队列里,暂时不处理,直到移除了内存屏障。

    删除内存屏障

        public void removeSyncBarrier(int token) {
            // Remove a sync barrier token from the queue.
            // If the queue is no longer stalled by a barrier then wake it.
            synchronized (this) {
                Message prev = null;
                Message p = mMessages;
                while (p != null && (p.target != null || p.arg1 != token)) {
                    //根据token找到对应的同步屏障,此时p指向这个同步屏障
                    prev = p;
                    p = p.next;
                }
                
                //没找到直接抛异常
                if (p == null) {
                    throw new IllegalStateException("The specified message queue synchronization "
                            + " barrier token has not been posted or has already been removed.");
                }
                
                final boolean needWake;
                if (prev != null) {
                    //把同步屏障从消息队列中删除。按照上面插入同步屏障的假设则是把20的next指向30,然后在下面的p.recycleUnchecked()会把25的next置为null,从而达到了删除的效果。
                    prev.next = p.next;
                    needWake = false;
                } else {
                    //同步屏障是消息队列的第一个消息,则把消息队列指向下一个消息
                    //如果消息队列的第一个消息为null则代表没有更多消息要处理,唤醒来设置阻塞时间为-1;如果target不为null则代表之前因为有同步屏障的存在没有处理该消息,唤醒来处理或者重新计算阻塞时间。
                    mMessages = p.next;
                    needWake = mMessages == null || mMessages.target != null;
                }
                p.recycleUnchecked();
    
                // If the loop is quitting then it is already awake.
                // We can assume mPtr != 0 when mQuitting is false.
                if (needWake && !mQuitting) {
                    nativeWake(mPtr);
                }
            }
        }
    

    退出消息循环

        void quit(boolean safe) {
            if (!mQuitAllowed) {
                throw new IllegalStateException("Main thread not allowed to quit.");
            }
    
            synchronized (this) {
                if (mQuitting) {
                    return;
                }
                mQuitting = true;
    
                if (safe) {
                    removeAllFutureMessagesLocked();
                } else {
                    removeAllMessagesLocked();
                }
    
                // We can assume mPtr != 0 because mQuitting was previously false.
                nativeWake(mPtr);
            }
        }
    

    该方法用来退出消息循环并且终止Looper。首先判断是否允许退出,如果是不允许退出则直接抛异常,否则进入同步代码块。

    接着判断是否正在退出,如果是就返回,否则把mQuitting赋值为true。

    然后看参数safe的值,如果是安全退出则只会把未到处理时间的Message从消息队列删除,如果不是安全退出则把整个消息队列的消息都删除。

    最后调用nativeWake()唤醒MessageQueue,这样在next()就会返回null,Looper检测到null之后就会return退出loop()的循环。

    removeAllFutureMessagesLocked
        private void removeAllFutureMessagesLocked() {
            final long now = SystemClock.uptimeMillis();
            Message p = mMessages;
            if (p != null) {
                if (p.when > now) {
                    removeAllMessagesLocked();
                } else {
                    Message n;
                    for (;;) {
                        n = p.next;
                        if (n == null) {
                            return;
                        }
                        if (n.when > now) {
                            break;
                        }
                        p = n;
                    }
                    p.next = null;
                    do {
                        p = n;
                        n = p.next;
                        p.recycleUnchecked();
                    } while (n != null);
                }
            }
        }
    

    这个方法主要是删除消息队列中要处理时间比现在迟的消息。首先判断消息队列是否为null,然后再判断队列的第一个消息的时候是否大于现在的时间,如果是则调用removeAllMessagesLocked()。因为消息是按照要时间来排序的,如果时间小的则排在前面,这里第一个消息都比现在的时间大,则整个消息队列的时间都是比现在迟的,所以全部删除即可。

    如果消息的处理时间小于当前时间,则通过for循环(相当于while)找到消息队列中要处理的时间比现在迟的消息的位置。然后再把这个消息从消息队列移除,接着把之后的消息全都回收掉。

    removeAllMessagesLocked
        private void removeAllMessagesLocked() {
            Message p = mMessages;
            while (p != null) {
                Message n = p.next;
                p.recycleUnchecked();
                p = n;
            }
            mMessages = null;
        }
    

    从消息队列的第一条开始把全部的消息都回收掉,然后把消息队列赋值为null。这样就达到了删除全部消息的效果。

    相关文章

      网友评论

          本文标题:MessageQueue源码详解

          本文链接:https://www.haomeiwen.com/subject/wfrevctx.html