为什么需要消息通信?
Android的主线程即UI线程是非安全线程,只允许UI线程更新UI状态。其它任务线程处理完事务后期望更新UI状态,只能发消息通知UI线程进行更新操作。假如多个线程同时通知UI线程更新状态势必造成冲突,会导致更新事件的丢失。通过增加消息池将收到的消息缓存起来,如此能够保证收到的各个通知信息得到执行。
消息驱动机制

- Runnalbe和Message可以被压入到MessageQueue中,形成一个集合。注意一般情况下某种类型的MessageQueue只允许保存相同类型的对象,图中为了叙述方便将它们混放在同一个MessageQueue中,实际源码对Runnable进行相应的转换。
- Looper循环的取出消息,然后传给Handler进行处理,如此循环往复。假如队列为空,那么它会进入休眠。
- Handler是真正“处理事情”的地方。
可以看出,上面的几个对象是缺一不可的,它们各司其职,就像一台计算机中CPU的工作方式:中央处理器(Looper)不断的从内存(MessageQueue)中读取指令(Message),执行指令(Handler),最终产生结果。Handler和Looper在同一个线程。
各部分介绍
消息--Message
消息用来表示一个可执行的任务,通常在消息内部会封装标识、执行时间和数据。Message本身是一个单向链表结构。
public final class Message implements Parcelable{
public int what; // 消息码
public int arg1;
public int arg2;
public Object obj; // 传递的对象
public Messenger replyTo;
long when; // 指定消息执行的时间
Bundle data;
Handler target; // 目标Handler
Runnable callback;
Message next; // 下一条消息的引用
private static final Object sPoolSync = new Object(); // 消息池的锁
private static Message sPool; // 消息池
private static int sPoolSize = 0; // 消息池当前大小
private static final int MAX_POOL_SIZE = 50; // 消息池最大值
public static Message obtain(){
synchronized(sPoolSync){
if(sPool != null){
Message m = sPool; // 指向消息池头部
sPool = m.next; // 消息池头部指向下一条消息
m.next = null;
sPoolSize--; // 消息池大小减1
return m;
}
return new Message(); // 消息池为空时创建一个消息
}
}
// 代码省略
public void recycle(){
clearForRecycle(); // 清空消息记录
synchronized(sPoolSync){
if(sPoolSize < MAX_POOL_SIZE){
next = sPool;
sPool = this; // 将当前消息加入到消息池头部
sPoolSize++; // 消息池当前消息数加1
}
}
}
}
说明: Message实现了Parcelable接口,因此Message对象可以写入Parcel中,也可以从Parcel中还原已写入的对象,这相当于Java的序列号和反序列化机制。可序列化保证Message对象可以通过Intent或者Binder远程传播。
消息队列--MessageQueue
public class MessageQueue{
private final boolean mQuitAllowed;
private int mPtr; // used by native code
...
private natvie void nativeInit();
MessageQueue(boolean quiteAllowed){
mQuitAllowed = quitAllowed;
nativeInit();
}
boolean enqueueMessage(Message msg, long when) {
// 入队的消息必须制定target
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
// 判断该消息的FLAG_IN_USE标记是否被设置,新消息不会设置该标记位
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
// 表示处理消息的目标端Handler所在线程已经异常退出
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
// 记录消息何时开始处理
msg.when = when;
// 消息队列头部,p相当于工作指针,指向当前消息
Message p = mMessages;
boolean needWake;
/*
* 对应三种情况:(1) 消息队列为空;(2)新消息需要立刻处理;
* (3)新消息处理时间早于消息队列头部消息的处理时间;
*/
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p; //消息队列头部的消息后移
mMessages = msg; // 将新消息插入消息队列头部,这样可以立即处理
needWake = mBlocked;
} else {
// 对应第四种情况:新消息处理时间晚于消息队列头部的处理时间
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
// 根据消息处理时间,检索消息应该插入消息队列哪个位置
for (;;) {
prev = p;
p = p.next;
// 循环退出条件:到达消息队列末尾或者在消息队列中找到一个处理时间大
// 于新消息处理时间的消息节点
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // 找到新消息插入位置
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
}
说明:
1)成员变量mQuitAllowed表示当前消息循环是否允许退出,nativeInit会在本地创建一个NativeMessageQueue对象,然后直接赋给MessageQueue中的成员变量mPtr。
2)新消息加入消息队列时,如果新消息加入到消息队列头部,且此时处理消息的线程处于block状态,则需要调用nativeWake方法唤醒处理线程。JNI层是通过write系统调用向管道写入“W”字符串,这样处理消息的线程会因为I/O事件而被唤醒。
线程处于空闲状态符合下面两种情景:
1)线程的消息队列为空;
2)消息队列头部消息的处理时间未到;
空闲消息处理函数用于在线程暂无消息处理时做一些辅助工作,其重要应用之一是在GcIdler中完成空闲时内存垃圾回收,另个重要应用是在Home启动后进入空闲状态是发送BOOT_COMPLETED广播。
常见的方法:
- 元素入队
final boolean enqueueMessage(Message msg, long when);
- 元素出队
final Message next();
- 删除元素
final void removeMessage(Handlder h, int what, Object object);
final void removeMessage(Handler h, Runnable r, Object object);
- 销毁队列
通过本地函数nativeDestory来销毁一个MessageQueue;
消息循环--Looper
主线程Looper
/*frameworks/base/core/java/android/app/ActivityThread.java*/
public static void main(String[] args){
...
Looper.prepareMainLooper();
ActivityThread thread = new ActivityThread();
thread.attach(false);
if(sMainThreadHandler == null){
sMainThreadHandler = thread.getHander();
}
AsyncTask.init();
Looper.loop();
}
/*frameworks/base/core/java/android/os/Looper.java*/
public static void prepareMainLooper(){
// 线程不允许退出
prepare(false);
synchronized(Looper.class){
if(sMainLooper != null){
throw new IllegalStateException("The main Looper has already been prepared");
}
sMainLooper = myLooper();
}
}
说明:prepareMainLooper需要用prepare,经过prepare后myLooper就可以得到一个本地线程<ThreadLocal>的Looper对象,最后赋值给sMainLooper。为了区分普通Looper,使用getMainLooper()获取主线程的Looper对象。
普通Looper线程
class LooperThread extends Thread{
public Handler mHandler;
public void run(){
// Looper线程准备阶段
Looper.prepare();
mHandler = new Handler(){
public void handleMessage(Message msg){
// 处理具体消息
}
};
// Looper线程循环阶段
Looper.loop();
}
}
Looper循环
public static void loop(){
Looper me = myLooper();
if(me == null){
throw new RuntimeException("No Looper; Looper.prepare() wasn't
called on this thread.");
}
MessageQueue queue = me.mQueue;
...
while(true){
Message msg = queue.next();
if(msg != null){
if(msg.target == null){
// No target is a magic identifier for the quit message.
return;
}
...
msg.target.dispatchMessage(msg); // 处理消息
...
msg.recycle(); // 回收消息
}
}
}
loop方法实质就是建立一个死循环,然后通过从消息队列中逐个取出消息,最后处理消息的过程。
消息处理器--Handler
Handler与Looper、MessageQueue关联
public class Hander{
...
final MessageQueue mQueue;
final Looper mLooper;
final Callback mCallback;
IMessage mMessage; // 用于跨进程的消息发送
public Handler(){
// 通过sThreadLocal.get()来获取当前线程中的Looper实例
mLooper = Looper.myLooper();
if(mLooper == null){
throw new RuntimeException("Can't create handler inside thread that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = null;
}
}
注意:使用UI线程的Looper,但是在子线程中创建Handler会抛出异常?因为Looper对象是ThreadLocal的,即每个线程都有自己的Looper,这个Looper可以为空。当在子线程中创建Handler对象时,此子线程的Looper为空则会抛出异常。
发送消息与处理消息
发送消息
- post消息
final boolean post(Runnable r);
final boolean postDelayed(Runnable r, long delayMillis);
final boolean postAtTime(Runnable r, long uptimeMillis);
- send消息
final boolean sendMessage(Message msg);
final boolean sendEmptyMessage(int what);
final boolean sendEmptyAtFrontOfQueue(Message msg);
boolean sendMessageAtTime(Message msg, long uptimeMillis);
final boolean sendMessageDelayed(Message msg, long delayMillis);
Post发送的信息需转换成Message,再调用Send系列函数发送出去。内部转换方法如下:
public final boolean post(Runnable r){
return sendMessageDelayed(getPostMessage(r), 0);
}
private static Message getPostMessage(Runnable r){
Message m = Message.obtain();
// 将Runnable对象设置为Message的回调函数
m.callback = r;
return m;
}
处理消息
public void dispatchMessage(Message msg){
if(msg.callback != null){ // 首先匹配消息中指定的回调方法
handleCallback(msg);
}else{
if(mCallback != null){ // 然后匹配创建Handler时指定的回调方法
if(mCallback.handleMessage(msg)){
return;
}
}
handleMessage(msg); // 最后匹配Handler的handleMessage方法
}
}
从MessageQueue获取到Message通过dispatchMessage(Message msg)进行分发,处理优先级:消息中携带的Runnable对象 --> 创建Handler时指定的回调方法 --> 重载的handleMessage方法。
由此可见,Handler的拓展子类可以通过重载dispatchMessage或者handleMessage来改变它的默认行为。
AsyncQueryHandler
使用场景
AsyncQueryHandler是一个抽象类,主要是用来在异步线程中操作数据库,当访问结束后通知界面更新。
使用方式
使用起来相当方便,只需要两步:
1)继承AsyncQueryHandler类,并实现onXXXComplete方法;
onXXXComplete方法中主要是处理数据库操作结果,用以完成界面更新。如果不处理结果的话,也可以不实现onXXXComplete方法。
2)利用AsyncQueryHandler实例直接调用startxxx()方法;
各个参数说明:
名称 | 含义 |
---|---|
token | 令牌,同一个AsyncQueryHandler类对象中,startQuery()和onQueryComplete()方法的token应该是一致的。 |
cookie | 在startQuery()中传入,想在onQueryComplete()中使用的对象,没有的话传递null即可。 |
uri | 操作的数据库对应的uri |
projection | 想要查询的列 |
selection | 限制条件 |
selectionArgs | 限制条件的具体值 |
orderBy | 排序条件 |
实现原理
AsyncQueryHandler就是利用分别在主线程和异步线程中的两个handler来实现异步访问数据库的功能的。其工作过程如下图所示[3]:

WorkHandler
WorkHandler是一个普通的Handler的子类,主要看下handlerMessage方法:
@Override
public void handleMessage(Message msg) {
final ContentResolver resolver = mResolver.get();
if (resolver == null) return;
WorkerArgs args = (WorkerArgs) msg.obj;
int token = msg.what;
int event = msg.arg1;
switch (event) {
case EVENT_ARG_QUERY:
Cursor cursor;
try {
cursor = resolver.query(args.uri, args.projection,
args.selection, args.selectionArgs,
args.orderBy);
// Calling getCount() causes the cursor window to be filled,
// which will make the first access on the main thread a lot faster.
if (cursor != null) {
cursor.getCount();
}
} catch (Exception e) {
Log.w(TAG, "Exception thrown during handling EVENT_ARG_QUERY", e);
cursor = null;
}
args.result = cursor;
break;
case EVENT_ARG_INSERT:
args.result = resolver.insert(args.uri, args.values);
break;
case EVENT_ARG_UPDATE:
args.result = resolver.update(args.uri, args.values, args.selection,
args.selectionArgs);
break;
case EVENT_ARG_DELETE:
args.result = resolver.delete(args.uri, args.selection, args.selectionArgs);
break;
}
// args.handler即为AsyncQueryHandler
Message reply = args.handler.obtainMessage(token);
reply.obj = args;
reply.arg1 = msg.arg1;
reply.sendToTarget();
}
说明:对数据库的增删改查操作放在WorkHandler对象所在的异步线程中,在handlerMessage方法的最后将处理结果封装到Message对象中,发送给AsyncQueryHandler进行处理。
AsyncQueryHandler
AsyncQueryHandler位于主线程中,负责对异步线程处理后的结果进行处理,其代码如下:
public void handleMessage(Message msg) {
WorkerArgs args = (WorkerArgs) msg.obj;
if (localLOGV) {
LogUtil.i(TAG, "QueryHandler.handleMessage: msg.what=" + msg.what
+ ", msg.arg1=" + msg.arg1);
}
int token = msg.what;
int event = msg.arg1;
// pass token back to caller on each callback.
switch (event) {
case EVENT_ARG_QUERY:
onQueryComplete(token, args.cookie, (Cursor) args.result);
break;
case EVENT_ARG_BULK_INSERT:
onBulkInsertComplete(token, args.cookie, (Integer) args.result);
break;
case EVENT_ARG_INSERT:
onInsertComplete(token, args.cookie, (Uri) args.result);
break;
case EVENT_ARG_UPDATE:
onUpdateComplete(token, args.cookie, (Integer) args.result);
break;
case EVENT_ARG_DELETE:
onDeleteComplete(token, args.cookie, (Integer) args.result);
break;
}
}
说明:AsyncQueryHandler的handlerMessage中,调用onXXXComplete方法,就可以完成界面的一些更新以及其它不耗时操作。
参考资料
[1] 深入理解Android内核设计思想,林学森
[2] Android的设计与实现-卷I,杨云君
[3] AsyncQueryHandler详解及使用,(https://blog.csdn.net/weixin_42193691/article/details/82469627)
网友评论