一、MessageQueue简介
- MessageQueue即消息队列,这个消息队列和Message消息对象对象池里面的消息对象池(单链表)可不是同一个东西。
- MessageQueue是一个消息队列,Handler将Message发送到消息队列中,消息队列会按照一定的规则取出要执行的Message。
- 需要注意的是Java层的MessageQueue负责处理Java的消息,native也有一个MessageQueue负责处理native的消息。
- 这里有必要提一下MessageQueue的数据结构,是一个单向链表,Message对象有个next字段保存列表中的下一个,MessageQueue中的mMessages保存链表的第一个元素。
- 用于将消息插入和读取
- 通过一个单链表的数据结构,维护消息列表
二、MessageQueue类注释
/**
* Low-level class holding the list of messages to be dispatched by a
* {@link Looper}. Messages are not added directly to a MessageQueue,
* but rather through {@link Handler} objects associated with the Looper.
*
* <p>You can retrieve the MessageQueue for the current thread with
* {@link Looper#myQueue() Looper.myQueue()}.
*/
public final class MessageQueue {
....
}
它是一个被Looper分发、低等级的持有Message集合的类。Message并不是直接加到MessageQueue的,而是通过Handler对象和Looper关联到一起。
我们可以通过Looper.myQueue()方法来检索当前线程的MessageQueue
它是一个低等级的持有Messages集合的类,被Looper分发。Messages并不是直接加到MessageQueue的,而是通过Handler对象和Looper关联到一起。我们可以通过Looper.myQueue()方法来检索当前线程的。
三、MessageQueue成员变量
// True if the message queue can be quit.
//用于标示消息队列是否可以被关闭,主线程的消息队列不可关闭
private final boolean mQuitAllowed;
@SuppressWarnings("unused")
// 该变量用于保存native代码中的MessageQueue的指针
private long mPtr; // used by native code
//在MessageQueue中,所有的Message是以链表的形式组织在一起的,该变量保存了链表的第一个元素,也可以说它就是链表的本身
Message mMessages;
//当Handler线程处于空闲状态的时候(MessageQueue没有其他Message时),可以利用它来处理一些事物,该变量就是用于保存这些空闲时候要处理的事务
private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
// 注册FileDescriptor以及感兴趣的Events,例如文件输入、输出和错误,设置回调函数,最后
// 调用nativeSetFileDescriptorEvent注册到C++层中,
// 当产生相应事件时,由C++层调用Java的DispathEvents,激活相应的回调函数
private SparseArray<FileDescriptorRecord> mFileDescriptorRecords;
// 用于保存将要被执行的IdleHandler
private IdleHandler[] mPendingIdleHandlers;
//标示MessageQueue是否正在关闭。
private boolean mQuitting;
// Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
// 标示 MessageQueue是否阻塞
private boolean mBlocked;
// The next barrier token.
// Barriers are indicated by messages with a null target whose arg1 field carries the token.
// 在MessageQueue里面有一个概念叫做障栅,它用于拦截同步的Message,阻止这些消息被执行,
// 只有异步Message才会放行。障栅本身也是一个Message,只是它的target为null并且arg1用于区分不同的障栅,
// 所以该变量就是用于不断累加生成不同的障栅。
private int mNextBarrierToken;
四、MessageQueue的构造函数
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
MessageQueue只是有一个构造函数,该构造函数是包内可见的,其内部就两行代码,分别是设置了MessageQueue是否可以退出和native层代码的相关初始化。
调用的地方Looper#Looper(boolean quitAllowed)
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed); // 消息队列
mThread = Thread.currentThread(); // 当前线程
}
在Looper中创建了MessageQueue。
MessageQueue处理消息
消息入队 enqueueMessage()
这个方法主要是用来处理发送消息的,当Handler通过自己enqueueMessage()将消息发送到这该函数中。该函数首先会判断判断是否msg.target有Handler的引用,消息会被按着时间顺序被添加到队列中。
Handler#enqueueMessage()
/**
* 将Msg添加到消息队列中
*
* @param queue 这个就是消息队列 从Looper里面获取到的
* @param msg 发送的具体消息
* @param uptimeMillis 处理时间
* @return
*/
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg, long uptimeMillis) {
// Handler 保存进来了 looper的时候使用这个Handler 处理消息的
// 为什么使用匿名内部类来创建Handler的方法会有内存泄漏的风险?
msg.target = this;
// 线程数据
msg.workSourceUid = ThreadLocalWorkSource.getUid();
// 异步
if (mAsynchronous) {
msg.setAsynchronous(true);
}
// 调用的是 Looper 的 Queen 的函数
return queue.enqueueMessage(msg, uptimeMillis);
}
MessageQueue#enqueueMessage(Message msg, long when)
boolean enqueueMessage(Message msg, long when) {
// msg 必须有target也就是必须有handler
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
// 锁定一下
synchronized (this) {
// 标示MessageQueue是否正在关闭。
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
//when 表示这个消息执行的时间,队列是按照消息执行时间排序的
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// 如果p为null则表示当前消息队列没有消息
// New head, wake up the event queue if blocked.
msg.next = p;
// 初始化头消息(改变了总是指向新的消息)
mMessages = msg;
// true代表无消息,阻塞线程,false代表有消息,没有阻塞线程
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
// 将消息放到队列,消息是按照msg的when 排序
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
消息轮询 next()
Looper.loop() 调用 Message msg = queue.next();
public static void loop() {
....
// 进入loop的主循环方法
for (; ; ) {
Message msg = queue.next(); // might block 可能会阻塞,因为next()方法可能会无限循环
....
// 回收
msg.recycleUnchecked();
}
}
next()
@UnsupportedAppUsage
Message next() {
//nextPollTimeoutMillis 表示nativePollOnce方法需要等待的时间
//nextPollTimeoutMillis=-1表示一直阻塞切不会超时
//nextPollTimeoutMillis>0 表示阻塞时长,可以理解为延迟多长时间发送消息
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
// //表示不会阻塞,立即执行
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
//native的方法,在没有消息的时候会阻塞管道读取端,只有nativePollOnce返回之后才能往下执行
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
//从开机到现在的毫秒数
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// 找不是异步而且msg.target不为空的message
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
//开机到现在的毫秒数如果小于msg.when则代表还未到发送消息的时间
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
// 虽然有消息,但是还没有到运行的时候
//计算还有等待多久,并赋值给nextPollTimeoutMillis
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
//代表当前没有阻塞
// Got a message.
mBlocked = false;
// 获取msg并且删除该节点
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
//返回拿到的消息
return msg;
}
} else {
//没有消息,nextPollTimeoutMillis复位
// No more messages.
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
上面的next()部分代码中有部分是native相关的知识,这里不做说明,我们只了解next执行的思路。next()方法中有一个无限循环,里面调用了阻塞方法,如果有消息或者等待延迟的时间到了才不会阻塞,系统将继续执行,在获取到消息后会将消息赋值给新的变量,并将这个消息从单链表中删除。
Looper.loop()里面的死循环不会死机是因为MessageQueue.next()里面调用了native层的函数阻塞了程序的循环。
移除消息 removeMessages()
就是将消息从链表移除,同时将移除的消息添加到消息池,提供循环复用。
采用了两个while循环,第一个循环是从队头开始,移除符合条件的消息,第二个循环是从头部移除完连续的满足条件的消息之后,再从队列后面继续查询是否有满足条件的消息需要被移除。
消息退出quit()方法
Looper调用 Looper.quit()
public void quit() {
mQueue.quit(false);
}
public void quitSafely() {
mQueue.quit(true);
}
MessageQueue.quite(boolean safe)
void quit(boolean safe) {
if (!mQuitAllowed) {
throw new IllegalStateException("Main thread not allowed to quit.");
}
synchronized (this) {
if (mQuitting) {
return;
}
mQuitting = true;
if (safe) {
//移除延迟消息 Looper.quitSafely()调用
removeAllFutureMessagesLocked();
} else {
//移除全部消息 Looper.quit()调用
removeAllMessagesLocked();
}
// We can assume mPtr != 0 because mQuitting was previously false.
nativeWake(mPtr);
}
}
removeAllMessagesLocked()
// 该方法是清楚消息列表的全部消息
private void removeAllMessagesLocked() {
Message p = mMessages; // mMessages 为消息队列的表头的消息
// 清除列表中的所有消息,recycleUnchecked()为进行回收
while (p != null) {
Message n = p.next;
p.recycleUnchecked();
p = n;
}
// mMessages 为 null 时Looper 的 for 循环就会结束
mMessages = null;
}
removeAllFutureMessagesLocked()
// 该方法值清除出延迟的消息,其他非延迟消息,依旧让他执行。
private void removeAllFutureMessagesLocked() {
//获取从开机到现在的时间
final long now = SystemClock.uptimeMillis();
Message p = mMessages;
if (p != null) {
//如果是延迟消息,那么整个消息队列都会清楚!
if (p.when > now) {
removeAllMessagesLocked();
} else {
Message n;
for (;;) {
n = p.next;
if (n == null) {
return;
}
// 如果当前消息是延迟消息,跳出循环
if (n.when > now) {
break;
}
p = n;
}
p.next = null;
//剩下的消息在do while中
do {
p = n;
n = p.next;
p.recycleUnchecked();
} while (n != null);
}
}
}
同步消息屏障
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}
网友评论