Android线程调度分析

作者: 黑狗狗哥 | 来源:发表于2019-08-12 15:21 被阅读15次

Android的主线程在初始化的时候,会调用Looper.prepare()和Looper.loop()方法,本文默认读者了解这些基础信息

初始化

Looper.java

    private static void prepare(boolean quitAllowed) {
        //一个线程只能调用一次Looper.prepare方法
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        //ThreadLocal(线程私有)的意思是,这个对象线程不共享,只对当前线程可见
        sThreadLocal.set(new Looper(quitAllowed));
    }

    private Looper(boolean quitAllowed) {
        //初始化消息队列
        mQueue = new MessageQueue(quitAllowed);
        mThread = Thread.currentThread();
    }

MessageQueue.java

MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
   //这个nativeInit返回的是native层的NativeMessageQueue的地址
    mPtr = nativeInit();
}

通过natvie映射关系可以找到,nativeInit最后调用的是android_os_MessageQueue.cpp的android_os_MessageQueue_nativeInit函数

static const JNINativeMethod gMessageQueueMethods[] = {
/* name, signature, funcPtr */
{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },
{ "nativeSetFileDescriptorEvents", "(JII)V",
        (void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },};

android_os_MessageQueue.cpp

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
//初始化NativeMessageQueue
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
    jniThrowRuntimeException(env, "Unable to allocate native queue");
    return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);}

NativeMessageQueue::NativeMessageQueue() :
    mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
    //初始化native层的looper
    mLooper = new Looper(false);
    Looper::setForThread(mLooper);
}}

Looper.cpp

  Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
  mSendingMessage(false),
  mPolling(false),
  mEpollRebuildRequired(false),
  mNextRequestSeq(0),
  mResponseIndex(0),
  mNextMessageUptime(LLONG_MAX) {
//这个fd主要用于唤醒线程,用于管道的数据传输,管道的一端写线程通过向管道内些数据,另外一端的线程则通过管道来读数据
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
AutoMutex _l(mLock);
rebuildEpollLocked();}

void Looper::rebuildEpollLocked() {
...
//这段代码的意思主要是,通过epoll机制监听mWakeEventFd的in事件。这个fd主要用于唤醒线程
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd.get();
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
     ...
}
总结下初始化做了什么
  1. 初始化java层的Looper对象,并且将其设置为线程私有,初始化java层的消息队列MessageQueue。
  2. 初始化native层的Looper对象,NativeMesasgeQueue对象。
  3. 并且在native层监听WakeEventFd的的IN事件。

消息读取

Looper.java

public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    final MessageQueue queue = me.mQueue;
        ...
    for (;;) {
       //这句话可能会让线程进入waiting状态
        Message msg = queue.next();
        ...
 try {  
            //这里就处理消息了
            msg.target.dispatchMessage(msg);
            dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
        } finally {
            if (traceTag != 0) {
                Trace.traceEnd(traceTag);
            }
        }
       ...
}

MessageQueue.java

Message next() {
   ...
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            Binder.flushPendingCommands();
        }
        nativePollOnce(ptr, nextPollTimeoutMillis);
        ...
}

android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
    jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
   ...
}

Looper.cpp

inline int pollOnce(int timeoutMillis) {
    return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
}

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
...
    if (result != 0) {
...
        return result;
    }
    result = pollInner(timeoutMillis);
    }
}

int Looper::pollInner(int timeoutMillis) {
...
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//这句代码可能会让线程处于空闲状态(让出CPU),如果没有事件发生的话,这个线程则会处于空闲状态
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
...
for (int i = 0; i < eventCount; i++) {
    int fd = eventItems[i].data.fd;
    uint32_t epollEvents = eventItems[i].events;
    if (fd == mWakeEventFd.get()) {
        if (epollEvents & EPOLLIN) {
            awoken();
        } else {
            ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
        }
    } else {
    ... 
}
}

void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ awoken", this);
#endif
uint64_t counter;
//这一句是为了把管道中的数据清空
TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
}
总结下消息读取主要做了什么
  1. java端通过死循环来读取Message。
  2. native端则是通过epoll机制监听是否有消息需要处理的事件
  3. 这里有个问题,为什么java端有死循环,却不会让主线程进入ANR状态呢?因为在native端,pollInner函数中会调用epoll_wait函数,这个epoll_wait函数可能会让线程处于wait状态,处于wait状态的线程不会占用CPU。

消息发送

Handler.java

public final boolean sendMessage(Message msg)
{
    return sendMessageDelayed(msg, 0);
}

 public final boolean sendMessageDelayed(Message msg, long delayMillis)
{
    if (delayMillis <    0) {
        delayMillis = 0;
    }
    return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}

 public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
        RuntimeException e = new RuntimeException(
                this + " sendMessageAtTime() called with no mQueue");
        Log.w("Looper", e.getMessage(), e);
        return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
}

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}

MessageQueue.java

boolean enqueueMessage(Message msg, long when) {
...
        msg.markInUse();
        msg.when = when;
        Message p = mMessages;
        boolean needWake;
        if (p == null || when == 0 || when < p.when) {
            //如果消息队列里面没有消息,就把当前的消息放在队列头,并且这是needWake为true,意思是需要唤醒waiting中的主线程
            msg.next = p;
            mMessages = msg;
            needWake = mBlocked;
        } else {
            //否则,就把当前消息放在队列中适当的位置,然后needWake为false,因为此时的主线程正处于running状态了。
            for (;;) {
                prev = p;
                p = p.next;
                if (p == null || when < p.when) {
                    break;
                }
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
            msg.next = p; // invariant: p == prev.next
            prev.next = msg;
        }

        // We can assume mPtr != 0 because mQuitting is false.
        if (needWake) {
            nativeWake(mPtr);
        }
    }
    return true;
}

android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}

void NativeMessageQueue::wake() {
mLooper->wake();
}

Looper.cpp

void Looper::wake() {
 ...
uint64_t inc = 1;
 //这一步主要是往fd中写入数据,只要wakeFd中有数据了,那么epoll_wait对应的线程就会被唤醒,继续执行
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
...
}
}
总结下消息发送的主要做了什么
  1. 首先,在java层,发送Message的时候,会把这个message放进消息队列中去。
  2. 其次再通知native层,把对应的线程唤醒,以便处理消息

相关文章

  • Android线程调度分析

    Android的主线程在初始化的时候,会调用Looper.prepare()和Looper.loop()方法,本文...

  • RxJava

    一、Schedulers(调度器)1.解决Android主线程问题【针对Android】2.解决多线程线程问题 二...

  • 线程优化

    1Android线程调度原理剖析 线程调度原理 任意时刻,只有一个线程占用CPU,处于运行状态。多线程并发:轮流获...

  • 五. Android 线程优化

    1. Android线程调度原理剖析 线程调度原理:任一时刻,只有一个线程占用CPU,处于运行状态 多线程并发:轮...

  • RxJava响应式编程

    常用名词说明 Schedulers(调度器) 解决Android主线程问题; 解决多线程线程问题 Observab...

  • Android线程管理(一)——线程通信

    线程通信、ActivityThread及Thread类是理解Android线程管理的关键。 线程,作为CPU调度资...

  • Android 线程优化

    Android线程调度机制 *分时调度模式:所有的线程轮流获得CPU的使用权,平均分配每个线程占用的CPU时间。 ...

  • RxJava2 源码分析二

    文章目录 前言 RxJava2 线程调度 RxJava2 怎么进行线程调度 总结 前言 经过RxJava2源码分析...

  • 线程

    一、Android线程的基本介绍 在操作系统中,线程是操作系统调度的最小单元

  • Java线程模型

    Java线程模型 本文将从线程类型、线程通信、线程调度三个方面分析Java中的线程模型。 什么是线程? 线程就是进...

网友评论

    本文标题:Android线程调度分析

    本文链接:https://www.haomeiwen.com/subject/yydujctx.html