美文网首页
Java层 MessageQueue 消息队列分析

Java层 MessageQueue 消息队列分析

作者: feifei_fly | 来源:发表于2021-01-13 20:25 被阅读0次
14201610540457_.pic.jpg

Java层MessageQueue分析

Native 层的MessageQueue简析

带着问题找答案:

本文主要解答,handler.sendMessageDelayed()延时消息是如何实现的?

1、Thread、Looper、MessageQueue、Handler 铁三角关系

1.1、Looper是依附在Thread之上的。一个Thread 对一个Looper

public final class Looper {
  static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
   private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
  }
}

调用Looper.prepare()创建Looper后是保存在sThreadLocal中。

1.2、建立起消息循环,需要线程中调用Looper.loop()

[Thread]

public class HandlerThread extends Thread{
 @Override
    public void run() {
        Looper.prepare();
        Looper.loop();
    }
}

1.3、Handler 持有run起来的Looper,是MessageQueue的入口和出口。

实例化Handler 需要传入一个Looper对象

 HandlerThread handerThread = new HandlerThread("demo");
 Handler handler = new Handler(handerThread.getLooper()){
    @Override
    public void handleMessage(@NonNull Message msg) {
        super.handleMessage(msg);
    }
};

1.3.1、Message 的核心属性

Message 的核心属性

public final class Message implements Parcelable {

          public long when; //(1)消费该消息的预期时间点
         
          Runnable callback; //(2)Handler.post()方法传入的Runnable对象

          Handler target; // (3)发送该消息的handler,最终也是该Message消息的消费者;配合what执行具体的业务操作
          public int what; // what 标识具体的业务
     }
  • when 标识消费该消息的预期时间点
  • callback 记录Handler.post()方法传入的Runnable对象
  • target 发送该消息的handler,最终也是该Message消息的消费者
  • what 区分具体的业务

1.3.2、向Handler发送Message

- handler.sendMessageDelayed(Message.obtain(),1000)
- handler.sendMessage(Message.obtain())
- handler.post(Runnable {  })
- handler.postDelayed(Runnable {  },100)

向handler中添加消息 有多种形式:,无论是sendMessage() 还是post(),最终都会封装一个Message对象,然后调用sendMessageAtTime()方法。

public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        if (queue == null) {
            RuntimeException e = new RuntimeException(
                    this + " sendMessageAtTime() called with no mQueue");
            Log.w("Looper", e.getMessage(), e);
            return false;
        }
        return enqueueMessage(queue, msg, uptimeMillis);
    }
sendMessageDelayed()
 public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
        if (delayMillis < 0) {
            delayMillis = 0;
        }
        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
    }
postDelayed()

Runnable会封装成一个Message对象(Runnable保存Message.callBack属性上)
然后执行sendMessageDelayed()


 private static Message getPostMessage(Runnable r) {
        Message m = Message.obtain();
        m.callback = r;
        return m;
    }
    
 public final boolean postDelayed(@NonNull Runnable r, long delayMillis) {
    return sendMessageDelayed(getPostMessage(r), delayMillis);
}

1.3.3、消费Message

Message从MessageQueue取出之后,会交给target Handler来处理,最终调用Handler.dispatchMessage()

Message msg = queue.next(); // might block
msg.target.dispatchMessage(msg);
dispatchMessage
public void dispatchMessage(@NonNull Message msg) {
        //(1)Runnable != null 优先执行Runnable()
        if (msg.callback != null) {
            handleCallback(msg);
        } else {
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            handleMessage(msg);
        }
    }
    
  • 通过handler.post()发送的消息,Message.callBack 不为空,优先执行Runnable.run()
 private static void handleCallback(Message message) {
        message.callback.run();
    }
  • Handler 构建时 若指定了mCallback 回调,则由mCallback 来处理消息。一般情况mCallback都为null,所以此种情况可以不考虑
  • 然后Message 交由Handler.handleMessage()来处理
class Handler{
   public void handleMessage(@NonNull Message msg) {}   
}

通过handler.sendMessage()发送的消息,需要复写Handler.handleMessage()来具体处理消息

二、延时消息是如何实现的。

- handler.sendMessageDelayed(Message.obtain(),1000)
- handler.postDelayed(Runnable {  },100)

这就要看MessageQueue对象了。

2.1、入队列

[Handler]

private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
            long uptimeMillis) {
        msg.target = this;
        msg.workSourceUid = ThreadLocalWorkSource.getUid();

        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }

Handler.enqueueMessage()方法中

  • 将handler本身 赋给了msg.target
  • 调用MessageQueue.queueMessage()

MessageQueue中最重要的就是两个方法:
1.enqueueMessage向队列中插入消息
2.next 从队列中取出消息

我们先分析enqueueMessage:

boolean enqueueMessage(Message msg, long when) {


        if (msg.target == null) { //排除msg.target == null 的case
            throw new IllegalArgumentException("Message must have a target.");
        }
        if (msg.isInUse()) { //排除 正在使用的msg
            throw new IllegalStateException(msg + " This message is already in use.");
        }

        synchronized (this) {
            if (mQuitting) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }

            msg.markInUse();
            msg.when = when;
            Message p = mMessages; //MesssageQueue中的消息队列 以单向链表的形式存在;mMessages 指向链表中的第一个幻速
            boolean needWake;

            //(1)新消息插入链表表头:a、当前队列没有待处理的消息 或者 b、新消息msg.when = 0(这种情况一般不存在)或者 c、新消息的触发时间 早于mMessages表头消息的触发时间
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else { //(2)将新消息按照msg.when时间的先后顺序,插入到mMessages链表的中间位置.使整个链表以时间排序。
                // Inserted within the middle of the queue.  Usually we don't have to wake
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }

            // We can assume mPtr != 0 because mQuitting is false.
            //(3)若当前是阻塞状态,则唤醒next()操作
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }

MessageQueue中的消息 是一个单向链表的形式保存的,mMessages 变量 指向链表的表头。Message链表中的元素是以触发时间(when)为基准,从小道大排列的,when小的排在链表前面,优先被处理;when大的排在链表的后面,延后处理。

2.1.1、 enqueueMessage 主要做了两件事情:

(1)将新来的Message消息 插入到链表中合适的位置。

满足以下条件,新message会被插入到链表头部

  • a、当前队列没有待处理的消息
  • b、新消息msg.when = 0(这种情况一般不存在)
  • c、新消息的触发时间 早于mMessages表头消息的触发时间
    其他情况,新message会被插入到链表中部合适位置。
(2)满足条件时,唤醒队列
 if (needWake) {
    nativeWake(mPtr);
 }

什么时候需要唤醒队列:

  • 新消息插入链表头部时,需要立即唤醒队列
  • 新消息插入链表中部时,一般不需要立即唤醒;但是当链表表头是一个消息屏障,且先消息是一个异步消息时,才需要唤醒队列。

2.1.2、next() 从队列中消费Message,进行分发处理

    Message next() {

        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        int nextPollTimeoutMillis = 0;
        //自旋
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }

            //(1)nativePollOnce 尝试阻塞
            //如果nextPollTimeoutMillis=-1,一直阻塞不会超时。
            //如果nextPollTimeoutMillis=0,不会阻塞,立即返回。
            //如果nextPollTimeoutMillis>0,最长阻塞nextPollTimeoutMillis毫秒(超时),如果期间有程序唤醒会立即返回。
            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;

                //(2)如果链表表头是一个同步屏障消息,则跳过众多同步消息,找到链表中第一个异步消息,进行分发处理
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }

                //(3) 返回取到的Message 
                if (msg != null) {
                    //msg尚未到达触发时间,则计算新的阻塞超时时间nextPollTimeoutMillis,下次循环触发队列阻塞
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        //从链表中移除该消息后,直接返回
                        // Got a message.
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        msg.markInUse();
                        return msg;
                    }
                } else { //没有找到异步消息,设置nextPollTimeoutMillis=-1,队列阻塞
                    // No more messages.
                    nextPollTimeoutMillis = -1;
                }

                // Process the quit message now that all pending messages have been handled.
                //(4)MessageQueue执行了quit(),此处释放MessageQueue
                if (mQuitting) {
                    dispose();
                    return null;
                }


                //(5) 对已注册的HandlerIdle回调的处理
        
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
                    // No idle handlers to run.  Loop and wait some more.
                    mBlocked = true;
                    continue;
                }

                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }

            // Run the idle handlers.
            // We only ever reach this code block during the first iteration.
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; // release the reference to the handler

                boolean keep = false;
                try {
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }

                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }

            // Reset the idle handler count to 0 so we do not run them again.
            pendingIdleHandlerCount = 0;

            // While calling an idle handler, a new message could have been delivered
            // so go back and look again for a pending message without waiting.
            nextPollTimeoutMillis = 0;
        }
    }

next()方法大概可以描述为:从Message链表中取出一条Message返回;当链表中无消息,或者链表中第一个Message尚未到达触发时间时,则阻塞next()方法。

代码比较长,我们分几部分来讲:

(1) nativePollOnce()

通过Native层的epoll来阻塞住当前线程

 nativePollOnce(ptr, nextPollTimeoutMillis);
 private native void nativePollOnce(long ptr, int timeoutMillis)

nativePollOnce根据传入的参数nextPollTimeoutMillis 会有不同的阻塞行为

  • 如果nextPollTimeoutMillis=-1,一直阻塞不会超时。
  • 如果nextPollTimeoutMillis=0,不会阻塞,立即返回。
  • 如果nextPollTimeoutMillis>0,最长阻塞nextPollTimeoutMillis毫秒(超时),如果期间有程序唤醒会立即返回。
(2)处理同步屏障消息

如果链表表头是一个同步屏障消息,则跳过众多同步消息,找到链表中第一个异步消息,进行分发处理

Message 分为同步消息和异步消息。

class Message{
    public boolean isAsynchronous() {
        return (flags & FLAG_ASYNCHRONOUS) != 0;
    }
    public void setAsynchronous(boolean async) {
        if (async) {
            flags |= FLAG_ASYNCHRONOUS;
        } else {
            flags &= ~FLAG_ASYNCHRONOUS;
        }
    }
}

通常我们使用Handler发消息都是同步消息,发出去之后就会在消息队列里面排队处理。我们都知道,Android系统16ms会刷新一次屏幕,如果主线程的消息过多,在16ms之内没有执行完,必然会造成卡顿或者掉帧。那怎么才能不排队,没有延时的处理呢?这个时候就需要异步消息。在处理异步消息的时候,我们就需要同步屏障,让异步消息不用排队等候处理。

可以理解为同步屏障是一堵墙,把同步消息队列拦住,先处理异步消息,等异步消息处理完了,这堵墙就会取消,然后继续处理同步消息。

MessageQueue里面有postSyncBarrier()可以发送同步屏障消息

public int postSyncBarrier() {
        return postSyncBarrier(SystemClock.uptimeMillis());
    }

    private int postSyncBarrier(long when) {
        // Enqueue a new sync barrier token.
        // We don't need to wake the queue because the purpose of a barrier is to stall it.
        synchronized (this) {
            final int token = mNextBarrierToken++;
            final Message msg = Message.obtain();
            msg.markInUse();
            msg.when = when;
            msg.arg1 = token;

            Message prev = null;
            Message p = mMessages;
            if (when != 0) {
                while (p != null && p.when <= when) {
                    prev = p;
                    p = p.next;
                }
            }
            if (prev != null) { // invariant: p == prev.next
                msg.next = p;
                prev.next = msg;
            } else {
                msg.next = p;
                mMessages = msg;
            }
            return token;
        }
    }

值得注意的是,同步屏障消息没有target,普通的消息的必须有target的。

再回到MessageQueue.next() 方法

//(2)如果链表表头是一个同步屏障消息,则跳过众多同步消息,找到链表中第一个异步消息,进行分发处理
        if (msg != null && msg.target == null) {
            // Stalled by a barrier.  Find the next asynchronous message in the queue.
            do {
                prevMsg = msg;
                msg = msg.next;
            } while (msg != null && !msg.isAsynchronous());
        }

我们看到,如果链表表头是一个同步屏障消息,就会遍历链表,返回第一个待处理异步消息。这样就跳过了前面众多的同步消息。

(3) 取到的Message返回还是阻塞?
//(3) 返回取到的Message 
        if (msg != null) {
            //msg尚未到达触发时间,则计算新的阻塞超时时间nextPollTimeoutMillis,下次循环触发队列阻塞
            if (now < msg.when) {
                // Next message is not ready.  Set a timeout to wake up when it is ready.
                nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
            } else {
                //从链表中移除该消息后,直接返回
                // Got a message.
                mBlocked = false;
                if (prevMsg != null) {
                    prevMsg.next = msg.next;
                } else {
                    mMessages = msg.next;
                }
                msg.next = null;
                msg.markInUse();
                return msg;
            }
        } else { //没有找到异步消息,设置nextPollTimeoutMillis=-1,队列阻塞
            // No more messages.
            nextPollTimeoutMillis = -1;
        }

回到next()中的代码块,取到待处理的message后做了如下处理

  • 如果msg == null,说明链表中没有消息,则nextPollTimeoutMillis = -1,下次循环 会无限阻塞。
  • msg !=null,并且msg.when 符合触发条件,则直接返回
  • msg !=null,但是msg.when 尚未到达预期的触发时间点,则重新计算nextPollTimeoutMillis,下次循环进行固定时长的阻塞。
(4)mQuitting 的处理

如果调用了MessageQueue.quit() ,mQuitting = true,队列中所有的消息都会处理后,会调用dispose 释放MessageQueue

if (mQuitting) {
    dispose();
    return null;
}
(5)对已注册的HandlerIdle回调的处理

MessageQueue可以注册HandlerIdle监听,此处对注册的HandlerIdle做回调处理。

 //(5) 对已注册的HandlerIdle回调的处理
        
        if (pendingIdleHandlerCount < 0
                && (mMessages == null || now < mMessages.when)) {
            pendingIdleHandlerCount = mIdleHandlers.size();
        }
        if (pendingIdleHandlerCount <= 0) {
            // No idle handlers to run.  Loop and wait some more.
            mBlocked = true;
            continue;
        }

        if (mPendingIdleHandlers == null) {
            mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
        }
        mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
                
                
                
                
         for (int i = 0; i < pendingIdleHandlerCount; i++) {
        final IdleHandler idler = mPendingIdleHandlers[i];
        mPendingIdleHandlers[i] = null; // release the reference to the handler

        boolean keep = false;
        try {
            keep = idler.queueIdle();
        } catch (Throwable t) {
            Log.wtf(TAG, "IdleHandler threw exception", t);
        }

        if (!keep) {
            synchronized (this) {
                mIdleHandlers.remove(idler);
            }
        }
    }

三、参考文章

https://zhuanlan.zhihu.com/p/265859150

https://www.jianshu.com/p/8c829dc15950

https://www.jianshu.com/p/48cf21ad637b

相关文章

网友评论

      本文标题:Java层 MessageQueue 消息队列分析

      本文链接:https://www.haomeiwen.com/subject/qvypaktx.html