Android消息处理机制实际上是一种异步处理机制。应用程序通过消息来驱动,系统为每一个应用程序维护一个消息队例,应用程序的主线程不断地从这个消息队例中获取消息(Looper),然后对这些消息进行处理(Handler),这样就实现了通过消息来驱动应用程序的执行.在Android源码中,ActivityManagerService和Activity、Service 通信的时候,都是通过Binder通知应用程序,然后程序拿到通知后并不是马上处理消息而是将这个请求封装成一个消息Message,然后把这个消息放在应用程序的消息队列中MessageQueue中去,然后再通过消息循环Lpiper 来处理这个消息。这个场景算是消息循环模型的实例。这样做的好处就是消息的发送方只要把消息发送到应用程序的消息队列中去就行了,它可以马上返回去处理别的事情,而不需要等待消息的接收方去处理完这个消息才返回,这样就可以提高系统的并发性。
以下是基于Android5.0以上源码分析。末尾会分析与Androdi4.0的不同
public final class ActivityThread {
public static void main(String[] args) {
SamplingProfilerIntegration.start();
..........
Process.setArgV0("");
Looper.prepareMainLooper();
ActivityThread thread = new ActivityThread();
thread.attach(false);
if (sMainThreadHandler == null) {
sMainThreadHandler = thread.getHandler();
}
AsyncTask.init();
if (false) {
Looper.myLooper().setMessageLogging(new LogPrinter(Log.DEBUG, "ActivityThread"));
}
Looper.loop();
throw new RuntimeException("Main thread loop unexpectedly exited");
}
我们再来看看 Looper.prepareMainLooper();到底干了些什么工作
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();/*sThreadLocal.get()*/
}
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}
我们可以看到这个函数的主要作用是创建了Looper.对象并保存在自己线程的私有存储空间内,sthreadLocal可以理解成线程内部私有空间,不与其他线程共享,每个线程只有一个。顺便多说一下有sThreadLocal.get()就必有sThreadLocal.set()。这样就保证了每一个调用 Looper.prepareMainLooper()函数的线程都有一个独立的Looper对象,我们经常在开发中遇到的报错
java.lang.RuntimeException: Can't create handler inside thread that has not called Looper.prepare()
就是因为在子线程中使用了Hanler对象,但是没有手动调用Looper.prepareMainLooper(),导致子线程中并没有Looper对象。而在UI主线程中系统会首先调用ActivityThread.main()进而自动调用prepareMainLooper()创建Looper对象。接下来我们着重看下Looper和MessageQueue。
public final class Looper {
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
private static Looper sMainLooper; // guarded by Looper.class
final MessageQueue mQueue;
.............
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}
}
public final class MessageQueue {
@SuppressWarnings("unused")
private long mPtr; // used by native code
Message mMessages;
......
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();/*JNI调用*/
}
}
/*android_os_MessageQueue.cpp*/
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}
/* /framework/native/services/surfaceflinger/MessageQueue.cpp */
NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
/* /system/core/libutils/Looper.cpp */
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
mWakeEventFd = eventfd(0, EFD_NONBLOCK);
LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd. errno=%d", errno);
AutoMutex _l(mLock);
rebuildEpollLocked();
}
void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
close(mEpollFd);
}
// Allocate the new epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd;
int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance. errno=%d",errno);
.........
}
Looper new了一个MessageQueue,保存在自己的成员变量mQueue中,MessageQueue 创建了一个NativeMessageQueue,并把其在内存中的地址记录在mPtr中。在natvive层Looper中这里我们只需关注三个函数即可
1.mWakeEventFd = eventfd(0, EFD_NONBLOCK);
2.mEpollFd = epoll_create(EPOLL_SIZE_HINT);
3.epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
了解Linux C的同学应该知道这是linux 的epoll机制和eventfd 信号量,
http://www.jianshu.com/p/27befc4dd33a //参考eventfd 和epoll
这里所做的事情就是handler实现延时的核心,非常重要
首先通过eventfd在创建了一个文件描述符,然后在rebuildEpollLocked函数中 通过epoll_ctl告诉mEpollFd监听mWakeEventFd文件描述符中的EPOLLIN事件,当mWakeEventFd里面有内容的时候,唤醒epoll_wait()
到这里Handler前期的准备工作已经做好
/* Looper.java */
public static void loop() {
final Looper me = myLooper();//获取java层Looper对象
..............
/*获取Looper对象中保存的MessageQueue对象*/
final MessageQueue queue = me.mQueue;
for (;;) {
/*当我们使用Handler发送Message的时候,
实际上系统是把消息插入到消息队列中按照时间先后重新排列*/
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
....................
msg.target.dispatchMessage(msg);
....................
}
}
/* MessageQueue.java*/
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
/*组建消息队列*/
synchronized (this) {
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
/*获取下条Message的超时时间*/
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}
.........................
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;
}
}
/*Looper.cpp*/
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
......
#if DEBUG_POLL_AND_WAKE
.......
#endif
......
result = pollInner(timeoutMillis);
}
}
int Looper::pollInner(int timeoutMillis)
{
......
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
}
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
/*******************关键的 epoll_wait **************/
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
// Rebuild epoll set if needed.
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = POLL_ERROR;
goto Done;
}
// Check for poll timeout.
if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = POLL_TIMEOUT;
goto Done;
}
// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
......
return result;
}
Looper循环流程.png
综上,可以看出Handler消息循环主要是通过eventfd和epoll,使线程阻塞在epoll_wait中,当文件eventfd中有消息或者epoll_wait设置的超时时间到了后,程序就往下执行,读取消息。
现在我们就来看看Handler是如何发送消息唤醒线程的
首先来看Handler的两个关键成员变量
final MessageQueue mQueue; //通过Looper.prepareMainLooper一步步创建的java MessageQueue 对象
final Looper mLooper;//通过Looper.prepareMainLooper创建的java Looper对象
Handler发送消息流程.png
/* MessageQueue.java */
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
//组建消息队列 _____start
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
//组建消息队列_______end
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
handler 发过来的消息,首先会组建成一个消息队列,然后判断时间,如果时间到了需要唤醒就执行 nativeWake(mPtr);最终调用Looper.cpp中的wake函数
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
惊讶不!!系统知识向mWakeEventFd 写了一个数字,当然这个数字没有什么意义,仅仅起了唤醒线程的作用。它只是告诉epoll_wait mWakeEventFd 这个文件描述符里面有内容可读,epoll_wait 摆脱阻塞继续向下执行,进而唤醒线程,一步步的向上返回消息最终
/*Looper.java*/
public static void loop() {
......
for (;;) {
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
return;
}
msg.target.dispatchMessage(msg);
......
}
/***Handler.java*/
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
```
msg.target就是咱们创建的Handler对象,所以调用handler的dispatchMessage函数,往下就是咱们熟悉的handler过程了,如果有回调函数就掉回调,没有的话就执行handleMessage。
整个消息循环图.png
Anrodid4.0
与Android5.0及以上 相似度95%,唯一不同的是Native层Looper.cpp中不是采用eventfd文件描述符,而是管道
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d",
errno);
result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
errno);
// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
errno);
}
不同之处
int result = pipe(wakeFds);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
.................
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
epoll 监控的是管道的读端即mWakeReadPipeFd,而在Looper的函数中是向管道的mWakeWritePipeFd写一个整形
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
这样也能实现唤醒功能
网友评论