Android线程调度分析

作者: 黑狗狗哥 | 来源:发表于2019-08-12 15:21 被阅读15次

    Android的主线程在初始化的时候,会调用Looper.prepare()和Looper.loop()方法,本文默认读者了解这些基础信息

    初始化

    Looper.java

        private static void prepare(boolean quitAllowed) {
            //一个线程只能调用一次Looper.prepare方法
            if (sThreadLocal.get() != null) {
                throw new RuntimeException("Only one Looper may be created per thread");
            }
            //ThreadLocal(线程私有)的意思是,这个对象线程不共享,只对当前线程可见
            sThreadLocal.set(new Looper(quitAllowed));
        }
    

        private Looper(boolean quitAllowed) {
            //初始化消息队列
            mQueue = new MessageQueue(quitAllowed);
            mThread = Thread.currentThread();
        }
    

    MessageQueue.java

    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
       //这个nativeInit返回的是native层的NativeMessageQueue的地址
        mPtr = nativeInit();
    }
    

    通过natvie映射关系可以找到,nativeInit最后调用的是android_os_MessageQueue.cpp的android_os_MessageQueue_nativeInit函数

    static const JNINativeMethod gMessageQueueMethods[] = {
    /* name, signature, funcPtr */
    { "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
    { "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
    { "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
    { "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
    { "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },
    { "nativeSetFileDescriptorEvents", "(JII)V",
            (void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },};
    

    android_os_MessageQueue.cpp

    static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    //初始化NativeMessageQueue
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }
    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jlong>(nativeMessageQueue);}
    

    NativeMessageQueue::NativeMessageQueue() :
        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        //初始化native层的looper
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }}
    

    Looper.cpp

      Looper::Looper(bool allowNonCallbacks)
    : mAllowNonCallbacks(allowNonCallbacks),
      mSendingMessage(false),
      mPolling(false),
      mEpollRebuildRequired(false),
      mNextRequestSeq(0),
      mResponseIndex(0),
      mNextMessageUptime(LLONG_MAX) {
    //这个fd主要用于唤醒线程,用于管道的数据传输,管道的一端写线程通过向管道内些数据,另外一端的线程则通过管道来读数据
    mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
    LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
    AutoMutex _l(mLock);
    rebuildEpollLocked();}
    

    void Looper::rebuildEpollLocked() {
    ...
    //这段代码的意思主要是,通过epoll机制监听mWakeEventFd的in事件。这个fd主要用于唤醒线程
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd.get();
    int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
         ...
    }
    
    总结下初始化做了什么
    1. 初始化java层的Looper对象,并且将其设置为线程私有,初始化java层的消息队列MessageQueue。
    2. 初始化native层的Looper对象,NativeMesasgeQueue对象。
    3. 并且在native层监听WakeEventFd的的IN事件。

    消息读取

    Looper.java

    public static void loop() {
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        final MessageQueue queue = me.mQueue;
            ...
        for (;;) {
           //这句话可能会让线程进入waiting状态
            Message msg = queue.next();
            ...
     try {  
                //这里就处理消息了
                msg.target.dispatchMessage(msg);
                dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
            } finally {
                if (traceTag != 0) {
                    Trace.traceEnd(traceTag);
                }
            }
           ...
    }
    

    MessageQueue.java

    Message next() {
       ...
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
            nativePollOnce(ptr, nextPollTimeoutMillis);
            ...
    }
    

    android_os_MessageQueue.cpp

    static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
    }
    

    void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
       ...
    }
    

    Looper.cpp

    inline int pollOnce(int timeoutMillis) {
        return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
    }
    

    int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
    ...
        if (result != 0) {
    ...
            return result;
        }
        result = pollInner(timeoutMillis);
        }
    }
    

    int Looper::pollInner(int timeoutMillis) {
    ...
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    //这句代码可能会让线程处于空闲状态(让出CPU),如果没有事件发生的话,这个线程则会处于空闲状态
    int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
    ...
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd.get()) {
            if (epollEvents & EPOLLIN) {
                awoken();
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
        ... 
    }
    }
    

    void Looper::awoken() {
    #if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ awoken", this);
    #endif
    uint64_t counter;
    //这一句是为了把管道中的数据清空
    TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
    }
    
    总结下消息读取主要做了什么
    1. java端通过死循环来读取Message。
    2. native端则是通过epoll机制监听是否有消息需要处理的事件
    3. 这里有个问题,为什么java端有死循环,却不会让主线程进入ANR状态呢?因为在native端,pollInner函数中会调用epoll_wait函数,这个epoll_wait函数可能会让线程处于wait状态,处于wait状态的线程不会占用CPU。

    消息发送

    Handler.java

    public final boolean sendMessage(Message msg)
    {
        return sendMessageDelayed(msg, 0);
    }
    

     public final boolean sendMessageDelayed(Message msg, long delayMillis)
    {
        if (delayMillis <    0) {
            delayMillis = 0;
        }
        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
    }
    

     public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        if (queue == null) {
            RuntimeException e = new RuntimeException(
                    this + " sendMessageAtTime() called with no mQueue");
            Log.w("Looper", e.getMessage(), e);
            return false;
        }
        return enqueueMessage(queue, msg, uptimeMillis);
    }
    

    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        msg.target = this;
        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }
    

    MessageQueue.java

    boolean enqueueMessage(Message msg, long when) {
    ...
            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {
                //如果消息队列里面没有消息,就把当前的消息放在队列头,并且这是needWake为true,意思是需要唤醒waiting中的主线程
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                //否则,就把当前消息放在队列中适当的位置,然后needWake为false,因为此时的主线程正处于running状态了。
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }
    
            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }
    

    android_os_MessageQueue.cpp

    static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
    }
    

    void NativeMessageQueue::wake() {
    mLooper->wake();
    }
    

    Looper.cpp

    void Looper::wake() {
     ...
    uint64_t inc = 1;
     //这一步主要是往fd中写入数据,只要wakeFd中有数据了,那么epoll_wait对应的线程就会被唤醒,继续执行
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
    ...
    }
    }
    
    总结下消息发送的主要做了什么
    1. 首先,在java层,发送Message的时候,会把这个message放进消息队列中去。
    2. 其次再通知native层,把对应的线程唤醒,以便处理消息

    相关文章

      网友评论

        本文标题:Android线程调度分析

        本文链接:https://www.haomeiwen.com/subject/yydujctx.html