美文网首页
IM客户端开发(3)——接收线程

IM客户端开发(3)——接收线程

作者: Magic11 | 来源:发表于2019-12-06 10:12 被阅读0次

    1、 接收线程负责发送心跳、读取消息、检查已发送的消息是否超时、检查网络是否还活着等

    void GSKCNet::recvLoop()
    {
        SetRecvLoop(true);
        SetExitRecvLoop(false);
        while (GetRecvLoop())
        {
            //满足条件就发送心跳
            sendHeartMsg();
            //检查网络上是否有消息可读
            readMsg();
            //处理超时消息
            handleTimeoutMsg();
            //检查网络是否活着
            checkNetActive();
        }
    
        //等待发送线程退出(这样才能正确处理所有请求回调)最多200ms
        int num = 0;
        while (!GetExitSendLoop())
        {
            glodon::sleep(10);
            ++num;
            if (num > 20)
            {
                break;
            }
        }
    
        ClearConext();
        SetLastActiveTime(0);
        m_iLastSendHeartTime = 0;
        SetExitRecvLoop(true);
    }
    

    心跳发送的逻辑如下:
    当发送队列为空且100秒内没有任何消息收发成功时,发送心跳

    void GSKCNet::sendHeartMsg()
    {
        //当前时间秒数
        time_t now = time(NULL);
        //心跳开始、发送对列为空、100s没有任何消息收发(成功)
        if (SendQueueIsEmpty() && m_bHeartSendStart 
            && (now - GetLastActiveTime() >= m_iHeartInterval) 
            && (now - m_iLastSendHeartTime >= m_iHeartInterval))
        {
            m_iLastSendHeartTime = now;
    
            GSKPacket *pPacket = createPacket(CMD_HEART,0,"");
            pPacket->setBody("");
    
            GSKRequest* pRequest = new GSKRequest(pPacket, 1000,g_heartCallback);
    
            PushToSendQueue(pRequest);   //将心跳请求放到发送队列
        }
    }
    

    接收消息的逻辑如下:
    1) 如果是服务器发来的消息(聊天消息、透传协议),直接放到回调队列queue<GSKResponse*> m_hasCallbackMap
    2) 如果是登录应答,启动心跳
    3) 如果是登出应答,停止心跳
    4) 如果是心跳应答,不做任何处理
    5) 如果是发送请求应答,检查已发送消息的map m_hasSendedMap
    代码逻辑如下:

    void GSKCNet::readMsg()
    {
        //检查网络上是否有数据可以读取
        int iSocketStatus = m_pGSKSocket->Select();
        if (iSocketStatus == GSK_SELECT_ERROR)
        {
            m_iNetStatus = GSKNET_STATUS_DISCONNECT;
        }
        else if (iSocketStatus == GSK_SELECT_READ)
        {
            m_iNetStatus = GSKNET_STATUS_OK;
            GSKResponse *stGSKResponse = new GSKResponse();
            int iRet = HandleResponse(stGSKResponse);
            if(iRet == 0)
            {
                //处理服务器发来的消息
                SetLastActiveTime(time(NULL));
                if(stGSKResponse->stHead.wCmd==SUBCMD_PUSH_CHATMSG ||
                    stGSKResponse->stHead.wCmd==SUBCMD_PUSH_SYSTEM_PROTOCOL ||
                    stGSKResponse->stHead.wCmd==SUBCMD_PUSH_CUSTOM_PROYOCOL)
                {
                    stGSKResponse->code = iRet;
                    stGSKResponse->cb = g_pushCallback;
    
                    PushToCallbackQueue(stGSKResponse);
                    return;
                }
                else if(stGSKResponse->stHead.wCmd == CMD_HEART_ANSWER)
                {
                    //do nothing
                }
                else if(stGSKResponse->stHead.wCmd == SUBCMD_LOGIN_ANSWER_SUCCEED)
                {
                    StartHeartBeat();
                    SetIsLogged(true);
                }
                else if (stGSKResponse->stHead.wCmd == SUBCMD_RCODE_LOGIN_ANSWER_SUCCEED)
                {
                    SetIsLogged(true);
                }
                else if(stGSKResponse->stHead.wCmd == SUBCMD_LOGIN_ANSWER_FAILED)
                {
                    StopHeartBeat();
                }
                else if(stGSKResponse->stHead.wCmd == SUBCMD_LOGOUT_ANSWER)
                {
                    StopHeartBeat();
                }
    
                updateSendedMap(stGSKResponse);
            }
        }
    }
    

    读取并解析一条消息的逻辑如下:

    int GSKCNet::HandleResponse(GSKResponse* pGSKResponse)
    {
        //获取可读字节数
        int readedBytes = m_recvBuffer.readableBytes();
        //获取开始标志和命令字
        if(readedBytes <= 3)
        {
            char *pBuff = nullptr;
            if (readedBytes == 3)
            {
                //开始位校验
                if (m_recvBuffer.peekInt8() != STX)
                {
                    m_recvBuffer.retrieveAll();
                    return -2;
                }
                //心跳应答校验
                if (m_recvBuffer.peekHeartBeatCmd() == CMD_HEART_ANSWER)
                {
                    //校验结束标志
                    pBuff = (char*)malloc(1);
                    int iRet = m_pGSKSocket->Recv(pBuff, 1);
    
                    if (iRet < 0)
                    {
                        handleRelogin(iRet);
                        free(pBuff);
                        return -1;
                    }
                    else if (iRet == 0)
                    {
                        free(pBuff);
                        return -1;
                    }
                    m_recvBuffer.append(pBuff, iRet);
    
                    if (m_recvBuffer.peekHeartBeatEnd() != ETX)
                    {
                        m_recvBuffer.retrieveAll();
                        free(pBuff);
                        return -3;
                    }
                    free(pBuff);
                    m_recvBuffer.retrieveAll();
    
                    pGSKResponse->stHead.wCmd = CMD_HEART_ANSWER;
                    pGSKResponse->stHead.subCmd = 0;
                    pGSKResponse->stHead.wClt = 3;
                    pGSKResponse->stHead.seqid = 0;
                    pGSKResponse->SetSequence(0);
                    pGSKResponse->SetMsg("");
                    return 0;
                } else {
                    //获取总长度
                    pBuff = (char*)malloc(4);
                    int iRet = m_pGSKSocket->Recv(pBuff, 4);
                    if (iRet < 0)
                    {
                        handleRelogin(iRet);
                        free(pBuff);
                        return -1;
                    }
                    else if (iRet == 0)
                    {
                        free(pBuff);
                        return -1;
                    }
                    m_recvBuffer.append(pBuff, iRet);
                }
            } else {
                //获取完整的前三个字节
                pBuff = (char*)malloc(3 - readedBytes);
                int iRet = m_pGSKSocket->Recv(pBuff, 3 - readedBytes);
    
                if (iRet < 0)
                {
                    handleRelogin(iRet);
                    free(pBuff);
                    return -1;
                }
                else if (iRet == 0){
                    free(pBuff);
                    return -1;
                }
                m_recvBuffer.append(pBuff, iRet);
    
    
                if (m_recvBuffer.peekInt8() != STX)
                {
                    m_recvBuffer.retrieveAll();
                    free(pBuff);
                    return -2;
                }
    
                free(pBuff);
                pBuff = nullptr;
    
                if (m_recvBuffer.peekHeartBeatCmd() == CMD_HEART_ANSWER)
                {
                    pBuff = (char*)malloc(1);
                    int iRet = m_pGSKSocket->Recv(pBuff, 1);
    
                    if (iRet < 0)
                    {
                        handleRelogin(iRet);
                        free(pBuff);
                        return -1;
                    }
                    else if (iRet == 0)
                    {
                        free(pBuff);
                        return -1;
                    }
                    m_recvBuffer.append(pBuff, iRet);
    
                    if (m_recvBuffer.peekHeartBeatEnd() != ETX)
                    {
                        m_recvBuffer.retrieveAll();
                        free(pBuff);
                        return -3;
                    }
                    free(pBuff);
                    m_recvBuffer.retrieveAll();
    
                    pGSKResponse->stHead.wCmd = CMD_HEART_ANSWER;
                    pGSKResponse->stHead.subCmd = 0;
                    pGSKResponse->stHead.wClt = 3;
                    pGSKResponse->stHead.seqid = 0;
                    pGSKResponse->SetSequence(0);
                    pGSKResponse->SetMsg("");
                    return 0;
                } else {
                    pBuff = (char*)malloc(4);
                    int iRet = m_pGSKSocket->Recv(pBuff, 4);
                    if (iRet < 0)
                    {
                        handleRelogin(iRet);
                        free(pBuff);
                        return -1;
                    }
                    else if (iRet == 0)
                    {
                        free(pBuff);
                        return -1;
                    }
                    m_recvBuffer.append(pBuff, iRet);
                }
            }
            
            if(m_recvBuffer.readableBytes() < 7)
            {
                free(pBuff);
                return -3;
            }
            free(pBuff);
            //消息长度
            int msgLen = m_recvBuffer.peekMsgLength();
            if( msgLen <= 0 /*|| msgLen > 2048000 */)
            {
                m_recvBuffer.retrieveAll();
                return -4;
            }
            else if(msgLen > (int)m_recvBuffer.readableBytes())
            {
                if(HandleResp(msgLen,pGSKResponse) < 0)
                {
                    return -5;
                }
            }
        }else {
            if (readedBytes < 7)
            {
                char *pBuff = (char*)malloc(7 - readedBytes);
                int iRet = m_pGSKSocket->Recv(pBuff, 7 - readedBytes);
    
                if (iRet < 0)
                {
                    handleRelogin(iRet);
                    free(pBuff);
                    return -1;
                }
                else if (iRet == 0)
                {
                    free(pBuff);
                    return -1;
                }
                m_recvBuffer.append(pBuff, iRet);
                free(pBuff);
            }
    
            //消息长度
            int msgLen = m_recvBuffer.peekMsgLength();
            if (msgLen - (int)m_recvBuffer.readableBytes() < 0)
            {
                m_recvBuffer.retrieveAll();
                return -6;
            }
    
            if (HandleResp(msgLen, pGSKResponse) < 0)
            {
                return -7;
            }
        }
    
        m_recvBuffer.retrieveAll();
        
        return 0;
    }
    

    读取消息到缓冲区的过程如下:

    int GSKSocket::Recv( char* szBuf, int iBufLen , int iTimeout ) 
    {
        if( IsConnected() )
        {
            fd_set stFdSet;
            FD_ZERO(&stFdSet);
            FD_SET(m_iSocket, &stFdSet);
    
            struct timeval stTimeout;
            stTimeout.tv_sec = iTimeout;
            stTimeout.tv_usec = 0;
    
            int iRet = 0;
            int iRecvBytes = 0;
            while (iRecvBytes < iBufLen)
            {
                if ((iRet = select(m_iSocket + 1, &stFdSet, NULL, NULL, &stTimeout)) == -1)
                {
                    //select socket for read failed 
                    return -31;
                }
                else if (iRet == 0)
                {
                    //select socket for read timeout
                    errno = ETIMEDOUT;
                    return -32;
                }
    
                if ((iRet = recv(m_iSocket, szBuf+iRecvBytes, iBufLen-iRecvBytes, 0)) == -1)
                {
                    if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EINTR)
                    {
                        continue;
                    }
                    else
                    {
                        //recv data failed,link break
                        Close();
                        return -33;
                    }
                }
                else if (iRet == 0)
                {
                    if(errno == EINTR)
                    {
                        continue;
                    }
                    //recv data none,link break
                    Close();   //读取长度为0,关闭连接
                    return -34;
                }
    
                iRecvBytes += iRet;
            }
    
            return iRecvBytes;
        }
    
        return 0;
    }
    

    当recv的消息长度为0时,关闭当前socket

    void GSKSocket::Close() 
    {
        if( m_iSocket != (int)INVALID_SOCKET )
        {
            LOGI_GSKCNET_INFO("close socket= %d", m_iSocket);
            glodon::closeSocket(m_iSocket);
            glodon::unInitlizeSocket();
            m_iSocket = INVALID_SOCKET; 
        }
    }
    

    收到请求回应的消息后需要遍历已发送的消息的map(m_hasSendedMap),然后调用相应的回调,发送成功或失败,代码逻辑如下:

    void GSKCNet::updateSendedMap(GSKResponse *stGSKResponse)
    {
        m_sendedmapmutex.lock();
        std::unordered_map<unsigned long long, GSKRequest*>::iterator iter = m_hasSendedMap.find(stGSKResponse->GetSequence());
        if(iter != m_hasSendedMap.end())
        {
            m_sendedmapmutex.unlock();
            GSKRequest* pSendRequest = iter->second;
    
            m_sendedmapmutex.lock();
            m_hasSendedMap.erase(iter);
            m_sendedmapmutex.unlock();
    
            stGSKResponse->code = GSKNET_SEND_MSG_SUCCEED_CODE;
            stGSKResponse->cb = pSendRequest->GetCallback();
            stGSKResponse->m_reqBody = pSendRequest->GetBody();
    
            PushToCallbackQueue(stGSKResponse);
            delete pSendRequest;
        }
        else
        {
            m_sendedmapmutex.unlock();
            //处理先收到回应,还没入已发送map的情况,不能用递归,万一有个不认识的,用递归这里就死循环了
            int num = 0;
            while (num < 50)
            {
                glodon::sleep(10);
    
                m_sendedmapmutex.lock();
                std::unordered_map<unsigned long long, GSKRequest*>::iterator iter = m_hasSendedMap.find(stGSKResponse->GetSequence());
                if (iter != m_hasSendedMap.end())
                {
                    m_sendedmapmutex.unlock();
                    GSKRequest* pSendRequest = iter->second;
    
                    m_sendedmapmutex.lock();
                    m_hasSendedMap.erase(iter);
                    m_sendedmapmutex.unlock();
    
                    stGSKResponse->code = GSKNET_SEND_MSG_SUCCEED_CODE;
                    stGSKResponse->cb = pSendRequest->GetCallback();
                    stGSKResponse->m_reqBody = pSendRequest->GetBody();
    
                    PushToCallbackQueue(stGSKResponse);
                    delete pSendRequest;
                    break;
                }
                else
                {
                    m_sendedmapmutex.unlock();
                }
                ++num;
            }
            
            if (num == 50)
            {
                delete stGSKResponse;
            }
        }
    }
    

    接收线程需要不断的遍历已发送消息的map,从而判断消息发送是否超时

    void GSKCNet::handleTimeoutMsg()
    {
        for(std::unordered_map<unsigned long long, GSKRequest*>::iterator iter = m_hasSendedMap.begin();
            iter != m_hasSendedMap.end();)
        {
            GSKRequest* pRequest = iter->second;
            //超时
            if(pRequest && (pRequest->GetTimeout() + pRequest->GetSendTime() < GSKRequest::GetCurrentTimeMsec()))
            {
                //回调函数
                handleMsgCallBack(GSKNET_CLIENT_REQUEST_TIMEOUT_CODE, pRequest->GetSequence(), 
                                    pRequest->GetReqCmd(), pRequest->getPacket()->getWClt(),
                                  pRequest->GetSubCmd(),pRequest->GetBody(),pRequest->GetCallback());
    
                m_sendedmapmutex.lock();
                m_hasSendedMap.erase(iter++);
                m_sendedmapmutex.unlock();
                delete pRequest;
                pRequest = NULL;
            }
            else
            {
                iter++;
            }
        }
    }
    
    

    接收线程需要检测连接是否还活着,一旦发现连接断开,就尝试重连

    void GSKCNet::checkNetActive()
    {
        if (GetLastActiveTime() != 0 && (time(NULL) - GetLastActiveTime() > (m_iHeartInterval * 2 + 5)))
        {
            if (time(NULL) - GetLastConnectTime() > 20)
            {
                if (TryConnectServer(5, CHECK_NET_ACTIVE_CNETWORK))
                {
                    LOGI_GSKCNET_INFO("%s", "checkNetActive()->TryConnectServer() is succeed");
                }
                else
                {
                    LOGI_GSKCNET_INFO("%s", "checkNetActive()->TryConnectServer() is failed");
                    SetIsLogged(false);
                }
            }
        }
    }
    

    当接收线程退出时,它需要先等待发送线程退出,因为它需要处理已发送消息的map,针对这些已发送的消息,调用发送消息失败的回调

    void GSKCNet::ClearConext()
    {
        m_recvBuffer.retrieveAll();
        if (m_pGSKSocket)
        {
            m_pGSKSocket->Close();
        }
        if (g_netConnectCallback)
        {
            g_netConnectCallback(false);
        }
    
        for (std::unordered_map<unsigned long long, GSKRequest*>::iterator iter = m_hasSendedMap.begin();
            iter != m_hasSendedMap.end();)
        {
            GSKRequest* pRequest = iter->second;
            if (pRequest)
            {
                //回调函数
                handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), 
                    pRequest->GetReqCmd(), pRequest->getPacket()->getWClt(),
                    pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
    
                m_sendedmapmutex.lock();
                m_hasSendedMap.erase(iter++);
                m_sendedmapmutex.unlock();
    
                delete pRequest;
                pRequest = NULL;
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:IM客户端开发(3)——接收线程

          本文链接:https://www.haomeiwen.com/subject/rkxjgctx.html