ALooper简析

作者: TankWitch | 来源:发表于2017-08-09 21:40 被阅读0次

    接着ALooper、AMessage、AHandler的简述,现在来分析下ALooper的声明以及定义。

    声明

    这篇重在细节,废话不多说,各位看官搬好小板凳,且请看代码(已加注释)。

    #include <media/stagefright/foundation/ABase.h>
    #include <media/stagefright/foundation/AString.h>
    #include <utils/Errors.h>
    #include <utils/KeyedVector.h>
    #include <utils/List.h>
    #include <utils/RefBase.h>
    #include <utils/threads.h>
    
    namespace android {
    //前置声明,在ALooper中只用到了这三个结构体的指针或者引用。至于为什么要使用前置声明,我在C/C++专栏会写
    struct AHandler;
    struct AMessage;
    struct AReplyToken;
    
    struct ALooper : public RefBase {
        typedef int32_t event_id;
        typedef int32_t handler_id;
        //构造函数
        ALooper();
    
        // Takes effect in a subsequent call to start().
        //给这个Looper起名字
        void setName(const char *name);
        //将一个Handler注册其中
        handler_id registerHandler(const sp<AHandler> &handler);
        //注销对应ID的AHandler
        void unregisterHandler(handler_id handlerID);
        //启动该Looper的独立线程
        status_t start(
                bool runOnCallingThread = false,
                bool canCallJava = false,
                int32_t priority = PRIORITY_DEFAULT
                );
        //停止该Looper的独立线程
        status_t stop();
        //获得当前的系统时间
        static int64_t GetNowUs();
        //获取当前Looper的名字
        const char *getName() const {
            return mName.c_str();
        }
    
    protected:
        virtual ~ALooper();
    
    private:
        friend struct AMessage;       // post()
        //包装Message
        struct Event {
            int64_t mWhenUs;
            sp<AMessage> mMessage;
        };
    
        Mutex mLock;
        Condition mQueueChangedCondition;
    
        AString mName;
    
        List<Event> mEventQueue;
        //继承于Thread
        struct LooperThread;
        sp<LooperThread> mThread;
        bool mRunningLocally;
    
        // use a separate lock for reply handling, as it is always on another thread
        // use a central lock, however, to avoid creating a mutex for each reply
        Mutex mRepliesLock;
        Condition mRepliesCondition;
    
        // START --- methods used only by AMessage
    
        // posts a message on this looper with the given timeout
        void post(const sp<AMessage> &msg, int64_t delayUs);
    
        // creates a reply token to be used with this looper
        sp<AReplyToken> createReplyToken();
        // waits for a response for the reply token.  If status is OK, the response
        // is stored into the supplied variable.  Otherwise, it is unchanged.
        status_t awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response);
        // posts a reply for a reply token.  If the reply could be successfully posted,
        // it returns OK. Otherwise, it returns an error value.
        status_t postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &msg);
    
        // END --- methods used only by AMessage
        //如果有队列中有消息,且未超过等待时间,则将队列头的消息交付给对应的handler处理
        bool loop();
    
        DISALLOW_EVIL_CONSTRUCTORS(ALooper);
    };
    

    下面,将就代码中和结构相关的部分简要分析一下。


    struct Event {
        int64_t mWhenUs;
        sp<AMessage> mMessage;
        };
    
    List<Event> mEventQueue;
    

    可以看到,Event结构体对Message以及该Message所对应的入队时间进行的组装。而后,又将这些进入该Looper的消息进行的一个排队(用一个List包起来)。如果大家还记得之前《ALooper、AMessage、AHandler的简述》一篇的图1的话,就能很好的理解,为什么称ALooper为消息队列了。

    定义


    下面,将就代码中和重要功能相关的部分简要分析一下。


    1. void ALooper::post(const sp<AMessage> &msg, int64_t delayUs)

    168    void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {
    169    Mutex::Autolock autoLock(mLock);
    170
    171    int64_t whenUs;
    172    if (delayUs > 0) {
    173        whenUs = GetNowUs() + delayUs;
    174    } else {
    175        whenUs = GetNowUs();
    176    }
    177
    178    List<Event>::iterator it = mEventQueue.begin();
    179    while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {
    180        ++it;
    181    }
    182
    183    Event event;
    184    event.mWhenUs = whenUs;
    185    event.mMessage = msg;
    186
    187    if (it == mEventQueue.begin()) {
    188        mQueueChangedCondition.signal();
    189    }
    190
    191    mEventQueue.insert(it, event);
    192}
    

    这个函数很简单。

    1. 根据设置的delayUS(延时交付时间)和NowUS(系统时间)来计算真正这条Message需要交付的时间。
    2. 之后再根据这个时间计算当前Message在消息队列中的位置。
    3. 如果该消息是队列头消息,就通知一下(等待中的线程中只有一个会被唤醒执行)。
    4. 如果不是就加入到队列的相应位置中。

    2. status_t postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &msg);

    252    status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) {
    253    Mutex::Autolock autoLock(mRepliesLock);
    254    status_t err = replyToken->setReply(reply);
    255    if (err == OK) {
    256        mRepliesCondition.broadcast();
    257    }
    258    return err;
    259}
    

    先看看replyToken是个什么鬼(他住在AMessage的家里)

    33    struct AReplyToken : public RefBase {
    34    AReplyToken(const sp<ALooper> &looper)
    35        : mLooper(looper),
    36          mReplied(false) {
    37    }
    

    再看看setReply做了什么事情。

    40   status_t AReplyToken::setReply(const sp<AMessage> &reply) {
    41    if (mReplied) {
    42        ALOGE("trying to post a duplicate reply");
    43        return -EBUSY;
    44    }
    45    CHECK(mReply == NULL);
    46    mReply = reply;
    47    mReplied = true;
    48    return OK;
    49}
    

    现在就很清楚了

    1. 创建一个和当前Looper相关联的AReplyToken ,并且初始化reply状态为false,就是还没有回复。
    2. 之后将reply的消息和reply的状态(变为true,意为这个时候已经回复了)给这个ReplyToken
    3. 如果成功返回OK,就广播一下(等待的线程都会被唤醒)

    3. status_t ALooper::start(bool runOnCallingThread, bool canCallJava, int32_t priority)

    96    status_t ALooper::start(
    97        bool runOnCallingThread, bool canCallJava, int32_t priority) {
    98    if (runOnCallingThread) {
    99        {
    100            Mutex::Autolock autoLock(mLock);
    101
    102            if (mThread != NULL || mRunningLocally) {
    103                return INVALID_OPERATION;
    104            }
    105
    106            mRunningLocally = true;
    107        }
    108
    109        do {
    110        } while (loop());
    111
    112        return OK;
    113    }
    114
    115    Mutex::Autolock autoLock(mLock);
    116
    117    if (mThread != NULL || mRunningLocally) {
    118        return INVALID_OPERATION;
    119    }
    120
    121    mThread = new LooperThread(this, canCallJava);
    122
    123    status_t err = mThread->run(
    124            mName.empty() ? "ALooper" : mName.c_str(), priority);
    125    if (err != OK) {
    126        mThread.clear();
    127    }
    128
    129    return err;
    130}
    

    首先,我们可以知道这是该Looper线程启动的函数. 当设置了runOnCallingThread为true的时候, 对应逻辑里面有一个重要的代码段:

        do {
        } while (loop());
    

    那这个loop() 是干嘛用的? 我们来看一看.

    194    bool ALooper::loop() {
    195    Event event;
    196
    197    {
    198        Mutex::Autolock autoLock(mLock);
    199        if (mThread == NULL && !mRunningLocally) {
    200            return false;
    201        }//如果没有分到线程且没有在本地运行,返回错误
    202        if (mEventQueue.empty()) {
    203            mQueueChangedCondition.wait(mLock);
    204            return true;
    205        }
    206        int64_t whenUs = (*mEventQueue.begin()).mWhenUs;
    207        int64_t nowUs = GetNowUs();
    208
    209        if (whenUs > nowUs) {
    210            int64_t delayUs = whenUs - nowUs;
    211            mQueueChangedCondition.waitRelative(mLock, delayUs * 1000ll);
    212
    213            return true;
    214        }
    215
    216        event = *mEventQueue.begin();
    217        mEventQueue.erase(mEventQueue.begin());
    218    }
    219
    220    event.mMessage->deliver();
    221
    222    // NOTE: It's important to note that at this point our "ALooper" object
    223    // may no longer exist (its final reference may have gone away while
    224    // delivering the message). We have made sure, however, that loop()
    225    // won't be called again.
    226
    227    return true;
    228}
    

    重要的有以下几点

        if (mEventQueue.empty()) {
            mQueueChangedCondition.wait(mLock);
            return true;
         }
    

    不知道大家是否还记得,之前讲异步消息机制简述那一篇中, 有提到过说, ALooper会检查消息队列中消息是否为空, 如果为空, 则等待事件, 降低CPU资源的消耗. 那么这段代码就是上面所说的具体实现了.

    之后, 会判断whenUS和nowUS, 根据这两个值的比较来判断是否还需要等待一段时间处理Event.

    紧接着, 用上面定义了的Event event, 拿到消息队列的列头, 并清除原队列的列头.

    最后, 交付这个消息. (这个交付的实现, 会在AMessage中讲解)

    好回到start()代码的接下来的流程中. 接下来就很简单了:

    1. 创建一个LooperThread的实例;
    2. 把这个实例线程跑起来.

    接下来,讲两个和AMessage::postAndAwaitResponse方法紧密相连的两个方法. 一个是ALooper::createReplyToken(), 而另外一个是ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response)

    首先, 来了解下

    4. ALooper::createReplyToken()

    这个玩意儿的代码如下:

    230    // to be called by AMessage::postAndAwaitResponse only
    231    sp<AReplyToken> ALooper::createReplyToken() {
    232    return new AReplyToken(this);
    233    }
    

    没错, 没干什么事情, 只是用当前的Looper, 实例化了一个AReplyToken(在第2点钟有提到这个东东)并返回.

    接着,来看这个

    5. ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response)

    它长这个样子:

    236    status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) {
    237    // return status in case we want to handle an interrupted wait
    238    Mutex::Autolock autoLock(mRepliesLock);
    239    CHECK(replyToken != NULL);
    240    while (!replyToken->retrieveReply(response)) {
    241        {
    242            Mutex::Autolock autoLock(mLock);
    243            if (mThread == NULL) {
    244                return -ENOENT;
    245            }
    246        }
    247        mRepliesCondition.wait(mRepliesLock);
    248    }
    249    return OK;
    250}
    

    我们遇到了陌生的retrieveReply, 他长这个样子:

    49    // if reply is not set, returns false; otherwise, it retrieves the reply and returns true
    50    bool retrieveReply(sp<AMessage> *reply) {
    51        if (mReplied) {
    52            *reply = mReply;
    53            mReply.clear();
    54        }
    55        return mReplied;
    56    }
    

    原生注释已经说得很清楚了. 如果replay没有设置,就返回false; 否则, 就获得这个replay的Message并返回true.

    回到awaitResponse中. 当retrieveReply返回false, 也就是说没有拿到需要replay的Message的时候, 等待着, 一直等待到拿到这个消息为止(各位看官, 这里就是为了实现postAndAwaitResponse的同步的目的了哦)


    总结

    这篇讲述了一个叫ALooper的履带。它将AMessage根据交付时间(whenUs)进行排队(List<mEvent>)。然后简单介绍了它是怎么将消息(AMessage交付的(有同步的, 也有异步的))。关于交付这一块是不完整的,大部分的逻辑在AMessage中,今天先介绍到这里,有空再写最复杂的AMessage和相对简单的AHandler。

    相关文章

      网友评论

        本文标题:ALooper简析

        本文链接:https://www.haomeiwen.com/subject/bksfrxtx.html