一.前言
最近准备在异步线程使用Handler,发现对其如何实现又忘记了,又要看源码来理解。还是记录下自己的理解思路,已方便记忆。当然异步线程使用Handler还是要用HandlerThread,所以先从HandlerThread下手,在一步步深入。
二.HandlerThread
HandlerThread使用的场景很简单,就是 Thread + Looper 的结合,也就是上面所说的异步线程中使用Handler的情形。所要执行的任务是串行地在子线程中执行。
@Override
public void run() {
mTid = Process.myTid();
Looper.prepare(); //创建当前线程的Loop
synchronized (this) {
mLooper = Looper.myLooper();
notifyAll(); //唤醒等待线程
}
Process.setThreadPriority(mPriority);
onLooperPrepared(); //子类要实现的方法,创建Handler的地方
Looper.loop(); //循环开始
mTid = -1;
}
上面这段代码其实就是创建Handler的使用环境,准备好Looper,onLooperPrepared()中再创建Handler,就可以在线程中使用了。
总结一下,在线程中使用Handler需要三步:
- Looper.prepare();
- 创建Handler
- Looper.loop();
那为什么要这样写才能在异步线程使用Handler呢?
三. Looper
3.1 首先来看Looper.prepare()
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
在Looper中,主要有ThreadLocal,MessageQueue,和Thread这三个变量。
- ThreadLocal:用来保存当前线程的Looper,通过set和get来实现写和读,且一个线程只能调用一次Looper.prepare(),否则会抛出异常。Looper与Thread一对一关系。它的作用是为Looper的myLooper()时提供Looper对象返回。
在set时,也将sThreadLocal对象赋值给了当前线程的threadLocals变量;在get时,在读取当前线程的sThreadLocal获取Looper对象。
- MessageQueue:顾名思义,用来存储Message的队列,通过Handler发送的消息就存储在这里,Looper循环调用queue.next()取出消息,如果为空就堵塞.
所以如果不调用quit方法退出循环,MessageQueue会一直堵塞导致线程一直存在。
- Thread:在Looper中,保存这个值,主要在两个地方被调用,一个是isCurrentThread判断是否为当前线程,一个是getThread获取Looper对象的线程。
Thread,Looper,主要通过ThreadLocal进行了关联。Thread中有ThreadLocal,而Looper又存在ThreadLocal中。
ThreadLocal,MessageQueue的实现原理,后面再详细介绍。接下来看看Looper.loop()。
3.2 Looper.loop();
在Handler初始化后,循环就从这里开始了。
/**
* Run the message queue in this thread. Be sure to call
* {@link #quit()} to end the loop.
*/
public static void loop() {
//1.获取Looper
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
//从Looper中获取消息队列
final MessageQueue queue = me.mQueue;
// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
for (;;) {
//2.获取消息,如果没有消息,会堵塞除非quit
Message msg = queue.next(); // might block
if (msg == null) { //退出循环
// No message indicates that the message queue is quitting.
return;
}
// This must be in a local variable, in case a UI event sets the logger
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
}
final long traceTag = me.mTraceTag;
if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
}
try {
//3.让Handler处理该Message
msg.target.dispatchMessage(msg);
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// Make sure that during the course of dispatching the
// identity of the thread wasn't corrupted.
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}
//回收Message,用于Message复用
msg.recycleUnchecked();
}
}
上面总体就分三步
- final Looper me = myLooper();获取Looper对象,并从对象中获取MessageQueue消息队列。
- Message msg = queue.next();从消息队列中获取消息,当没有消息时,会堵塞,知道消息队列中被插入消息,再返回消息。调用quit退出循环。
- msg.target.dispatchMessage(msg);target实际就是Handler,调用其对应的Handler处理消息.
可以看出,消息循环中,最重要的就是queue.next()了,它是实现堵塞的关键。接下来分析MessageQueue。
3.3 quit(),quitSafely()
quit()方法是直接情况消息队列退出循环,quitSafely()只清空延迟消息。两个方法共同点是不能再接收新的消息了,消息队列为空就退出循环,都是调用MessageQueue的quit方法来实现,具体下面分析。
四. MessageQueue
4.1 enqueueMessage
先看消息的插入,它是在Handler的sendMessageAtTime中被调用,Handler的消息发送最终都是调用了,具体后面分析。
boolean enqueueMessage(Message msg, long when) {
//1.插入的消息Handler是一定不能为空的
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
//2.1.如果头部为空或插入的消息需立即发送,或发送的消息延迟时间小于头,则将这条消息插入头部
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
//如果之前堵塞的,现在改为唤醒
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
//2.2.将消息按时间排序插入队列
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
上面的代码重点有两处:
- Handler为空是会抛出异常,一定有处理消息的Handler才可以执行插入。
- 按时间排序的样式插入消息队列。如果消息队列为空,则插入消息,转换堵塞状态唤醒队列。
原来MessageQueue中只拥有消息队列的头,存储队列的数据结构实际上是Message,以单链表的数据结构来存储数据,而MessageQueue用来实现业务逻辑。
对于队列的堵塞和唤醒,是Native部分了。
4.2 Message next()
这部分代码大都有Native实现,实现的逻辑主要是按时间排序取出消息并返回,如果没有则堵塞。
主要的java代码
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
//1.获取消息,并返回消息
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
//2.退出循环
// Process the quit message now that all pending messages have been handled.
if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
//3.如果不做特别处理,则进行将mBlocked = true,利用nativePollOnce(ptr, nextPollTimeoutMillis)进行堵塞操作
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
- mQuitting当消息队列为空时,使用mQuitting来判断是否关闭消息队列并返回空值。这个值是通过调用quit来设置的,也就是Looper的quit方法。
- 当消息队列为空时,如果没有添加IdleHandler接口来进行特别处理,则会设置mBlocked = true,在下一次循环调用nativePollOnce时进行堵塞,直到enqueueMessage插入数据后唤醒。
4.3 void quit(boolean safe)
这个方法会在Looper的quit()和quitSafely()方法中被调用。
quit()调用quit(false),quitSafely()调用quit(true)。
void quit(boolean safe) {
if (!mQuitAllowed) {
throw new IllegalStateException("Main thread not allowed to quit.");
}
synchronized (this) {
if (mQuitting) {
return;
}
//设置退出队列的标示
mQuitting = true;
if (safe) {
removeAllFutureMessagesLocked();
} else {
removeAllMessagesLocked();
}
// We can assume mPtr != 0 because mQuitting was previously false.
nativeWake(mPtr);
}
}
- removeAllFutureMessagesLocked()方法是删除所有的延迟消息,等待立即执行的消息取完,再返回空给Looper,以结束循环。
- removeAllMessagesLocked()方法则是销毁所有的消息,等Looper获取消息时,再返回空给Looper,以结束循环。
在插入消息的代码中,也会以mQuitting来判断是否可以插入消息。
五.Handler
5.1 消息处理
从Looper的loop()方法中,可以看到,最终调用msg.target.dispatchMessage(msg);来处理Message。
/**
* Handle system messages here.
*/
public void dispatchMessage(Message msg) {
//1.Message中的callback
if (msg.callback != null) {
handleCallback(msg);
} else {
//2.创建Handler时的参数Callback
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
//3.Handler里的空方法handleMessage,如要使用需要被重新
handleMessage(msg);
}
}
上面代码清晰的显示出响应消息的顺序,
- 首先是Message中带有的Runnable,它是通过Message的obtain获取消息时传入的:
obtain(Handler h, Runnable callback)
- 其次是Handler构造函数传入的Callback处理消息:
Handler(Looper looper, Callback callback, boolean async)
- 覆写Handler的空方法来处理消息:
/**
* Subclasses must implement this to receive messages.
*/
public void handleMessage(Message msg) {
}
对于2和3来说,处理消息的代码和Handler在Handler里是唯一的,而1的处理消息方式可以每次都不同,灵活性更高。
5.2 消息发送
5.2.1 Handler发送消息方法
将消息插入队列,就是调用MessageQueue的enqueueMessage方法进行插入。下面的公共方法sendMessageAtTime和sendMessageAtFrontOfQueue都是调用私有方法enqueueMessage来执行插入。
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
Handler所有的发送消息最后都是通过这个方法实现消息插入,msg.target = this;这段代码就确认插入的所有Message都拥有这个Handler对象。
5.2.1.1 sendMessageAtTime
上面说过,Handler的sendMessageXXX方法和sendEmptyMessageXXX,最终都是通过
sendMessageAtTime(Message msg, long uptimeMillis)
来实现。
sendEmptyMessage -> sendEmptyMessageDelayed -> sendMessageDelayed -> sendMessageAtTime。
sendMessage -> sendMessageDelayed.
post(Runnable r) - > sendEmptyMessageDelayed.
postAtTime - > sendMessageAtTime.
postDelayed -> sendMessageDelayed
...
5.2.1.2 sendMessageAtFrontOfQueue
还有一个发送消息的方法
public final boolean sendMessageAtFrontOfQueue(Message msg)
顾名思义,将消息插入消息队列的最前端。相关的方法有:
postAtFrontOfQueue - > sendMessageAtFrontOfQueue。
5.2.2 Message发送消息
Message的构造函数是空方法
/** Constructor (but the preferred way to get a Message is to call {@link #obtain() Message.obtain()}).
*/
public Message() {
}
从注释中可以看到,官方推荐obtain(),Message.obtain()来获取Message,这样有利于消息对象复用,避免不必要的对象创建。
/**
* Return a new Message instance from the global pool. Allows us to
* avoid allocating new objects in many cases.
*/
public static Message obtain() {
synchronized (sPoolSync) {
//如果有回收的Message对象,就使用回收的对象
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;
m.flags = 0; // clear in-use flag
sPoolSize--;
return m;
}
}
//否则才重新创建对象
return new Message();
}
而对象的回收,在Looper的loop()方法中分析过,在消息处理后,调用msg.recycleUnchecked()来进行回收。
/**
* Recycles a Message that may be in-use.
* Used internally by the MessageQueue and Looper when disposing of queued Messages.
*/
void recycleUnchecked() {
// Mark the message as in use while it remains in the recycled object pool.
// Clear out all other details.
flags = FLAG_IN_USE;
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = -1;
when = 0;
target = null;
callback = null;
data = null;
synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) {
next = sPool;
sPool = this;
sPoolSize++;
}
}
}
它是使用链表进行回收消息的存储,将数据插入表头,通过obtain获取消息时,取表头的消息进行返回,链表中删除这个Message。
来看看Message发送消息的方法
/**
* Sends this Message to the Handler specified by {@link #getTarget}.
* Throws a null pointer exception if this field has not been set.
*/
public void sendToTarget() {
target.sendMessage(this);
}
在调用sendToTarget之前,要调用getTarget来确认Message的target是否有值。空的构造函数并不会赋值target,所有还需要setTarget来设置处理消息的Handler对象。
Message中的发送消息还是通过Handler的sendMessage来实现。
六. 总结
上面的分析是围绕消息的发送到处理来分析的,所以对使用方法没有做详细介绍。但在分析源码后,在使用上却更得心应手了。
如消息处理的先后顺序,Message发送消息的条件,Handler发送消息的方法共同点,退出Looper循环的两个方法的不同点等。
也理解了MessageQueue的存储数据结构实际上是通过Message链表实现;线程和Looper是如何相互绑定一一对应的,ThreadLocal的作用;MessageQueue如何堵塞如何唤醒,队列如何退出;在Looper中处理完事件,会调用recycleUnchecked进行message回收复用等。
网友评论