简介
消息驱动是一种进程/线程的运行模式,内部或者外部的消息事件被放到进程/线程的消息队列中按序处理是现在的操作系统普遍采用的机制.Android也是采用了消息驱动的机制来处理各种外部按键,触屏,系统Intent,广播事件等消息.
Android的消息队列是线程相关的,每启动一个线程,都可以在内部创建一个消息队列,然后在消息队列中不断循环检查是否有新的消息需要处理,如果有,则对该消息进行处理,如果没有,线程就进入休眠状态直到有新的消息需要处理为止.
数据模型
Android中与消息机制相关的类主要有Looper,MessageQueue,Handler,Message,相关的代码主要在以下文件中:
- frameworks/base/core/java/android/os/Looper.java
- frameworks/base/core/java/android/os/Message.java
- frameworks/base/core/java/android/os/MessageQueue.java
- frameworks/base/core/java/android/os/Handler.java
- frameworks/base/core/jni/android_os_MessageQueue.cpp
- system/core/libutils/Looper.cpp
- Looper
Looper对象是用来创建消息队列并进入消息循环处理的.每个线程只能有一个Looper对象,同时对应着一个MessageQueue,发送到该线程的消息都将存放在该队列中,并由Looper循环处理。Android默认只为主线程)(UI线程)创建了Looper,所以当我们新建线程需要使用消息队列时必须手动创建Looper. - MessageQueue
MessageQueue即消息队列,由Looper创建管理,一个Looper对象对应一个MessageQueue对象. - Handler
Handler是消息的接收与处理者,Handler将Message添加到消息队列,同时也通过Handler的回调方法handleMessage()处理对应的消息.一个Handler对象只能关联一个Looper对象,但多个Handler对象可以关联到同一个Looper.默认情况下Handler会关联到实例化Handler线程的Lopper,也可以通过Handler的构造函数的Looper参数指定Handler关联到某个线程的Looper,即发送消息到某个指定线程并在该线程中回调Handler处理该消息. - Message
Message是消息的载体,Parcelable的派生类,通过其成员变量target关联到Handler对象.
它们之间关系如下图示:
在代码中我们一般如下使用线程的消息机制:
class LooperThread extends Thread {
public Handler mHandler;
public void run() {
Looper.prepare();
mHandler = new Handler() {
public void handleMessage(Message msg) {
// process incoming messages here
}
};
Looper.loop();
}
}
线程消息队列的创建
线程的消息队列通过Looper创建并维护的,主线程中调用Looper.prepareMainLooper(),其他子线程中调用Looper.prepare()来创建消息队列.一个线程多次调用prepareMainLooper()或prepare()将会抛出异常.
在介绍消息队列创建之前,首先了解一下Looper与MessageQueue,再看消息队列创建的流程.
- Looper类的主要成员变量与方法如下:
public final class Looper {
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
private static Looper sMainLooper;
final MessageQueue mQueue;
final Thread mThread;
public static void prepare() {...}
private static void prepare(boolean quitAllowed) {...}
public static void prepareMainLooper() {...}
public static Looper getMainLooper() {...}
public static void loop() {...}
}
- sThreadLocal是静态成员变量,用于保存线程私有的Looper对象
- sMainLooper是主线程的Looper对象.在prepareMainLooper()中赋值,可通过调用getMainLooper获取
- mQueue即消息队列,在Looper构造函数中初始化
- mThread即Looper所在的线程
- MessageQueue类的主要成员变量与方法如下:
public final class MessageQueue {
private final boolean mQuitAllowed;
private long mPtr;
Message mMessages;
MessageQueue(boolean quitAllowed) {...}
boolean enqueueMessage(Message msg, long when) {...}
Message next() {...}
}
- mQuitAllowed代表是否允许退出消息循环,主线程中默认为false,子线程默认false
- mPtr保存的是NativeMessageQueue的地址,通过该地址就可以找到java层MessageQueue所对应native的MessageQueue.
- mMessages即消息队列,通过mMessages可以遍历整个消息队列
- 消息队列的创建:
消息队列的创建从Looper.prepare()/Looper.prepareMainLooper()开始
public static void prepare() {
prepare(true);
}
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
通过调用prepare()或prepareMainLooper()创建Looper对象,然后保存到sThreadLocal中,sThreadLocal是模板类ThreadLocal<T>,它通过线程ID与对象关联的方式实现线程本地存储功能.这样放入sThreadLocal对象中的Looper对象就与创建它的线程关联起来了.所以可以从sThreadLocal中获取到保存的Looper对象:
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
主线程的Loopper对象保存在sMainLooper,可以通过getMainLooper获取
public static Looper getMainLooper() {
synchronized (Looper.class) {
return sMainLooper;
}
}
创建Looper同时会创建Looper关联的MessageQueue并赋值给成员变量mQueue,接下来再看new MessageQueue(quitAllowed)的过程:
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
可以看到,直接调用了nativeInit().这个JNI方法定义在android_os_MessageQueue.cpp
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}
nativeInit()中首先创建了nativeMessageQueue,然后又将nativeMessageQueue的地址赋值给java层的mPtr,所以java层的MessageQueue就可以通过mPtr找到nativeMessageQueue了.
再看new NativeMessageQueue()过程,NativeMessageQueue的构造如下:
NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
它首先通过Looper::getForThread()判断当前线程是否已创建过Looper对象,如果没有则创建.注意,这个Looper对象是实现在JNI层的,与上面Java层的Looper是不一样的,不过也是对应的关系.JNI层的Looper对象的创建过程是在Looper.cpp中实现的.
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
mWakeEventFd = eventfd(0, EFD_NONBLOCK);
LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd. errno=%d", errno);
AutoMutex _l(mLock);
rebuildEpollLocked();
}
创建eventfd并赋值给mWakeEventFd,在以前的Android版本上,这里创建的是pipe管道.eventfd是较新的API,被用作一个事件等待/响应,实现了线程之间事件通知.
void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
close(mEpollFd);
}
// Allocate the new epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d",
errno);
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set, errno=%d",
request.fd, errno);
}
}
}
rebuildEpollLocked中通过epoll_create创建了一个epoll专用的文件描述符,EPOLL_SIZE_HINT表示mEpollFd上能监控的最大文件描述符数.最后调用epoll_ctl监控mWakeEventFd文件描述符的EPOLLIN事件,即当eventfd中有内容可读时,就唤醒当前正在等待的线程.
C++层的这个Looper对象创建好了之后,就返回到JNI层的NativeMessageQueue的构造函数,再返回到Java层的消息队列MessageQueue的创建过程,最后从Looper的构造函数中返回.线程消息队列的创建过程也就此完成.
总结一下:
- 首先在Java层创建了一个Looper对象,然后创建MessageQueue对象mQueue,进入MessageQueue的创建过程
- MessageQueue在JNI层创建了一个NativeMessageQueue对象,并将这个对象保存在MessageQueue的成员变量mPtr中
- 在JNI层,创建了NativeMessageQueue对象时,会创建了一个Looper对象,保存在JNI层的NativeMessageQueue对象的成员变量mLooper中,这个对象的作用是,当Java层的消息队列中没有消息时,就使Android应用程序线程进入等待状态,而当Java层的消息队列中来了新的消息后,就唤醒Android应用程序的线程来处理这个消息
-
关于java层与JNI层的Looper,MessageQueue对象可以这样理解,java层的Looper,MessageQueue主要实现了消息队列发送处理逻辑,而JNI层的主要实现是线程的等待/唤醒.在逻辑上他们还是一一对应的关系,只不过侧重点不同.
java与jni层Looper,MessageQueue关系
线程消息队列的循环
当线程消息队列创建完成后,即进入消息队列循环处理过程中,Android消息队列的循环通过Loop.Loop()来实现,整个流程如下图示.
消息队列循环流程
下面具体来看具体分析
public static void loop() {
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
...
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
...
msg.target.dispatchMessage(msg);
...
}
}
进入loop前,首先通过myLooper()拿到前面创建的Looper对象,如果为null将会抛出异常,这也就是为什么必须在Looper.loop()之前调用Looper.prepare()或者Looper.prepareMainLooper()的原因.接下来通过me.mQueue拿到MessageQueue对象,而后进入到无尽循环处理中.在循环中通过queue.next()从队列中取消息,再调用msg.target.dispatchMessage(msg)处理.下面看一下queue.next()流程.
Message next() {
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1;
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (false) Log.v("MessageQueue", "Returning message: " + msg);
return msg;
}
} else {
nextPollTimeoutMillis = -1;
}
if (mQuitting) {
dispose();
return null;
}
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null;
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf("MessageQueue", "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
pendingIdleHandlerCount = 0;
nextPollTimeoutMillis = 0;
}
}
先看一下开始定义的2个变量的含义,pendingIdleHandlerCount表示消息队列空闲消息处理器(IdleHandler)的个数,nextPollTimeoutMillis表示没有消息处理时,线程需睡眠等待的时间.nativePollOnce将会睡眠等待nextPollTimeoutMillis时间.从nativePollOnce返回后,再从消息队列中取消息,如果没有任何消息,那么nextPollTimeoutMillis赋值为-1,表示下一次nativePollOnce无限制等待直到其他线程把它唤醒.如果取到消息,比较消息处理的时间与当前时间,如果消息处理的时间未到(now < msg.when),那么计算nextPollTimeoutMillis,等下一次时间到时再处理.如果消息处理时间已到,那么取出消息返回到Looperde的loop中处理.另外如果当前没有消息处理时,会回调注册的IdleHandler.
下面继续分析nativePollOnce.
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
最终nativePollOnce调用的JNI层Looper的pollOnce
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
...
if (result != 0) {
...
return result;
}
result = pollInner(timeoutMillis);
}
}
在pollOnce中不断的循环调用pollInner来检查线程是否有新消息需要处理.如果有新消息处理或者timeoutMillis时间到,则返回到java层MessageQueue的next()继续执行.
int Looper::pollInner(int timeoutMillis) {
...
int result = POLL_WAKE;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
...
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
...
}
}
...
return result;
}
epoll_wait会监听前面创建的epoll实例的文件描述符上的IO读写事件,如果文件描述上没有IO事件出现,那么则等待timeoutMillis延时,检测到EPOLLIN事件即文件描述符上发生了写事件,随后调用awoken读出数据,以便接收新的数据.
void Looper::awoken() {
uint64_t counter;
TEMP_FAILURE_RETRY(read(mWakeEventFd, &counter, sizeof(uint64_t)));
}
在awoken中读出数据.然后一步步返回到java层的MessageQueue继续消息处理.
线程消息的发送
消息的发送是通过Handler来执行的,下面我们从new Handler()开始,一步步分析消息的发送过程
首先看一下Handler类的主要数据成员与方法:
public class Handler {
final MessageQueue mQueue;
final Looper mLooper;
public Handler() {...}
public Handler(Looper looper, Callback callback) {...}
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {...}
public void handleMessage(Message msg) {...}
public final boolean sendMessage(Message msg){...}
public final boolean sendEmptyMessage(int what){...}
public final boolean sendEmptyMessageAtTime(int what, long uptimeMillis) {...}
public final boolean sendEmptyMessageDelayed(int what, long delayMillis) {...}
public boolean sendMessageAtTime(Message msg, long uptimeMillis) {...}
...
public final boolean post(Runnable r){...}
public final boolean postAtFrontOfQueue(Runnable r){...}
public final boolean postAtTime(Runnable r, long uptimeMillis){...}
public final boolean postDelayed(Runnable r, long delayMillis){...}
}
- mQueue handler对应的MessageQueue对象,通过handler发送的消息都将插入到mQueue队列中
- mLooper handler对应的Looper对象,如果创建Handler前没有实例化Looper对象将抛出异常.
Handler是与Looper对象相关联的,我们创建的Handler对象都会关联到某一Looper,默认情况下,Handler会关联到创建Handler对象所在线程的Looper对象,也可通过Handler的构造函数来指定关联到的Looper.Handler发送消息有二类接口,post类与send类,一般send类用来发送传统带消息ID的消息,post类用来发送带消息处理方法的消息.
下面来看消息发送的具体流程
消息发送流程
Handler或Post类方法最终都会调用enqueueMessage将消息发送到消息队列
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
Message的成员变量target赋值为this,即关联到handler.然后继续调用MessageQueue的enqueueMessage方法
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
/// M: Add message protect mechanism @{
if (msg.hasRecycle) {
Log.wtf("MessageQueue", "Warning: message has been recycled. msg=" + msg);
return false;
}
/// Add message protect mechanism @}
synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w("MessageQueue", e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
MessageQueue中的enqueueMessage主要工作是将message插入到队列,然后根据情况判断是否应该调用nativeWake唤醒目标线程.当前队列为空或者插入消息处理时间延时为0或者处理时间小于队头处理时间时,消息被插入到头部,否则按时间遍历插入到对应位置,并设置needWake标志,needWake是根据mBlocked来判断的,mBlocked记录了当前线程是否处于睡眠状态,如果消息插入队头且线程在睡眠中,neeWake为true,调用nativeWake唤醒目标线程.
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
return nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
void Looper::wake() {
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
nativeWake最终会调用到jni层的Looper对象的wake方法中,Looper wake方法的实现非常简单,即向mWakeEventFd写入一个uint64_t,这样目标线程就会因为mWakeEventFd发生的IO事件而唤醒.消息的发送流程就此结束.
线程消息的处理
从前面的分析可以知道,当线程没有消息需要处理时,会在c++层Looper对象的pollInner中进入睡眠等待,当有新消息唤醒该目标线程时或这延时时间到,执行流程将沿着pollInner调用路径一直返回,直到java层Looper类的loop.
消息处理流程
loop中将调用msg.target.dispatchMessage(msg)处理消息,这里的msg.target就是上面enqueueMessage中所赋值的handler,即进入handler的dispatchMessage处理消息
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
dispatchMessage进行消息处理,先检查是否有设置msg.callback,如果有则执行msg.callback处理消息,如果没有则继续判断mCallback的执行,最后才是handleMessage处理.
网友评论