美文网首页
IM客户端开发(2)——发送线程

IM客户端开发(2)——发送线程

作者: Magic11 | 来源:发表于2019-12-05 16:33 被阅读0次

1、 建立连接成功后,上层app会立即发送登录请求,登录成功之后就可以发送消息了,发送登录请求和发送消息都是调用底层的Request接口,过程如下:

Java_com_vkansee_jnidemo_jniapi_GIMNetApi_Request(JNIEnv *_env, jclass _thiz, jint _cmd,
                                                  jint _subCmd, jstring _toUid, jstring _routeId,
                                                  jint _timeOut, jint _reqeustId, jbyteArray _msg) {
    jsize len  = _env->GetArrayLength(_msg);
    jbyte *pMsg = (jbyte*)_env->GetByteArrayElements(_msg, 0);

    char* buffer = (char*)malloc(len+1);
    memcpy(buffer, pMsg, len);
    buffer[len] = '\0';
    int iRet = GSKCNet::sharedInstance()->Request(iCmd , iSubCmd, strToUid, strRouteId,
    iTimeout, buffer, (int)len, [=](__GSKNetArg stArg) {
        
        (g_jvm)->GetEnv((void **) &env, g_version);    //注意由于回调是在另外一个线程中调用的,所以需要先把jvm attach到调用当前回调的线程
        status = g_jvm->AttachCurrentThread(&env, NULL);

        int iCode = stArg.iCode;
        int iSeq = stArg.iSeq;
        int iCmd = stArg.iCmd;

        env->CallStaticVoidMethod(cls, mid,  iCode, iSeq, iCmd, iRequestId, jarray);
        
        g_jvm->DetachCurrentThread();    //调用结束后,需要把jvm detach到调用当前回调的线程

    });
    free(buffer);

    _env->ReleaseByteArrayElements(_msg, pMsg, 0);

    return iRet;
}

Request先根据请求将消息封装成一个消息包Packet,然后再将这个Packet和相应的回调callback封装到一个请求里面,最后将这个请求放到发送队列里面

unsigned long long GSKCNet::Request(int cmd, int subCmd, const std::string &toUid, const std::string &routeId,
                     int iTimeout, const char *pMsg, int iLen, GSKCNetCallbackType callback)
{
    GSKPacket *pPacket = createPacket(cmd,subCmd,toUid);
    pPacket->setEncryptType(m_encrypttype);
    pPacket->setBody(pMsg, iLen);
    pPacket->setRouteId(routeId);
    GSKRequest* pRequest = new GSKRequest(pPacket, iTimeout, callback);

    PushToSendQueue(pRequest);
    return  pPacket->getSeqid();
}

消息包的封装如下:

GSKPacket *GSKCNet::createPacket(int cmd, int subcmd, const std::string &toUid)
{
    //消息包
    GSKPacket *pPacket = GSKPacket::create();
    pPacket->setCmd(cmd);
    pPacket->setSubCmd(subcmd);
    pPacket->setFromUid(m_sendUid);
    pPacket->setToUid(toUid);
    if (cmd == CMD_HEART)
    {
        pPacket->setSeqid(0);
    }
    else{
        pPacket->setSeqid(createPageId());
    }
    pPacket->setVersion(m_appVersion);

    pPacket->setAppId(m_appId);
    return pPacket;
}

其中每个消息包有个序列号Seqid,它是随时间递增的

unsigned long long GSKCNet::createPageId()
{
    std::lock_guard<std::mutex> loc(m_refcount);
    static volatile unsigned long long id = GSKRequest::GetCurrentTimeMsec();
    if (id == 0xFFFFFFFFFFFFFFFF)
    {
        id = GSKRequest::GetCurrentTimeMsec();
    }
    else
    {
        ++id;
    }
    return id;
}

可以看到这个Id基本能保证唯一性
Request会把这个请求放到一个发送队列里面std::queue< GSKRequest* > m_sendQueue

void GSKCNet::PushToSendQueue(GSKRequest *req)
{
    std::lock_guard<std::mutex> loc(m_lock);

    if (SUBCMD_LOGIN_MSG == req->GetReqCmd())
    {
        bool isExist = false;
        std::queue<GSKRequest*> tmpQ(m_sendQueue);
        while (!tmpQ.empty())
        {
            GSKRequest* pRequest = tmpQ.front();
            tmpQ.pop();
            if (pRequest->GetReqCmd() == SUBCMD_LOGIN_MSG)
            {
                isExist = true;
                break;
            }
        }
        if (isExit)
        {
            delete req;
            req = NULL;
            m_sendcv.notify_all();
        }
        else
        {
            m_sendQueue.push(req);
            m_sendcv.notify_all();
        }
        
    }

    m_sendQueue.push(req);
    m_sendcv.notify_all();
}

下面看下发送线程如何处理这个发送队列
首先发送线程是在每次连接完成后启动

void GSKCNet::Start()
{
    //开启发送线程
    m_workThread = std::thread(std::bind( &GSKCNet::Loop, this));
    m_workThread.detach();
    //开启接收线程
    m_recvThread = std::thread(std::bind(&GSKCNet::recvLoop, this));
    m_recvThread.detach();
    //开启回调处理线程
    m_callbackThread = std::thread(std::bind(&GSKCNet::callbackLoop, this));
    m_callbackThread.detach();
}

发送线程不断地检查发送队列是否为空,如果为空则阻塞,如果不为空,则发送一条消息

void GSKCNet::Loop()
{
    SetWorking(true);
    SetExitSendLoop(false);
    std::mutex sendconlock;
    std::unique_lock < std::mutex > lck(sendconlock);
    //线程正在运行
    while (GetWorking())
    {
        if (SendQueueIsEmpty())
        {
            if (GetWorking())
            {
                m_sendcv.wait(lck);
            }
            else
            {
                break;
            }
        }
        
        if (GetWorking())
        {
            //检查消息发送队列,有消息就发送
            sendMsg();
        }
    }
    ClearSendQueue();
    SetIsLogged(false);
    SetExitSendLoop(true);
}

发送消息的过程分为如下几步:
1) 从消息队列获取到最前面的一条消息
2) 如果消息体需要加密,对消息体进行加密
3) 对消息体进行拼接并且序列化
4) 发送消息

a.  消息发送成功
    将已发送的请求放入一个已发送的map中  unordered_map< unsigned long long, GSKRequest* > m_hasSendedMap
b.  消息发送失败
    1.1 查看是否正在尝试连接
          如果正在重连
              等待连接1秒
                  若连接成功,再次发送消息
                  若发送成功,加入已发送的map中
                  若发送失败,调用发送失败回调
        1.2 没有重连
                尝试重连
                    若连接成功,
                        再次发送消息
                        若发送成功,加入已发送的map中
                        若发送失败,调用发送失败回调
                    若连接失败,
                        查看是否正在尝试连接
                            如果正在重连
                               等待连接1秒
                                    若连接成功,再次发送消息
                                    若发送成功,加入已发送的map中
                                    若发送失败,调用发送失败回调

代码逻辑如下:

void GSKCNet::sendMsg()
{
    //发送队列有消息
    GSKRequest* pRequest = GetFrontRequest();

    std::string outBody;
    Encrypt(pRequest->GetBody(), pRequest->getPacket()->getRouteId(), outBody);
    pRequest->getPacket()->setBody(outBody);
    //包头拼接完成并且包体已经序列化
    std::string msg = pRequest->GetEncodeBody();
    int iRet = m_pGSKSocket->Send(msg.data(), (int)msg.size());
    //发送失败
    if(iRet < 0)
    {
        if (GetTryReconnected())
        {
            //等待连接完成,然后发送消息
            bool tryRec = GetTryReconnected();
            int num = 0;
            while (tryRec && GetWorking())
            {
                glodon::sleep(10);
                tryRec = GetTryReconnected();
                ++num;
                if (num > 100)
                {
                    break;
                }
            }

            if (!GetWorking())
            {
                handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), pRequest->GetReqCmd(),
                pRequest->getPacket()->getWClt(),pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
                delete pRequest;
                pRequest = nullptr;
                return;
            }

            int iRets = m_pGSKSocket->Send(msg.data(), (int)msg.size());
            if (iRets < 0)
            {
                handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), pRequest->GetReqCmd(),
                pRequest->getPacket()->getWClt(), pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
                delete pRequest;
                pRequest = nullptr;
            }
            else
            {
                SetLastActiveTime(time(NULL));
                pRequest->SetSendTime(GSKRequest::GetCurrentTimeMsec());
                PushToSendedMap(pRequest);
            }
        }
        else
        {
            if (TryConnectServer(1, SEND_FAILED_CNETWORK))
            {
                int iRets = m_pGSKSocket->Send(msg.data(), (int)msg.size());
                if (iRets < 0)
                {
                    LOGI_GSKCNET_INFO("second sendMsg failed iRets = %d", iRets);
                    handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), pRequest->GetReqCmd(), pRequest->getPacket()->getWClt(),
                        pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
                    delete pRequest;
                    pRequest = nullptr;
                }
                else
                {
                    SetLastActiveTime(time(NULL));
                    pRequest->SetSendTime(GSKRequest::GetCurrentTimeMsec());
                    PushToSendedMap(pRequest);
                }
            }
            else
            {
                if (GetTryReconnected())
                {
                    bool tryRec = GetTryReconnected();
                    //等待连接完成,然后发送消息
                    int num = 0;
                    while (tryRec && GetWorking())
                    {
                        glodon::sleep(10);
                        tryRec = GetTryReconnected();
                        ++num;
                        if (num > 100)
                        {
                            break;
                        }
                    }

                    if (!GetWorking())
                    {
                        handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), pRequest->GetReqCmd(),
                        pRequest->getPacket()->getWClt(), pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
                        delete pRequest;
                        pRequest = nullptr;
                        return;
                    }

                    int iRets = m_pGSKSocket->Send(msg.data(), (int)msg.size());
                    if (iRets < 0)
                    {
                        handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), pRequest->GetReqCmd(),
                        pRequest->getPacket()->getWClt(), pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
                        delete pRequest;
                        pRequest = nullptr;
                    }
                    else
                    {
                        SetLastActiveTime(time(NULL));
                        pRequest->SetSendTime(GSKRequest::GetCurrentTimeMsec());
                        PushToSendedMap(pRequest);
                    }
                }
                else
                {
                    handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), pRequest->GetReqCmd(), 
                    pRequest->getPacket()->getWClt(), pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
                    delete pRequest;
                    pRequest = nullptr;
                }
            }
        }
    }
    else
    {
        SetLastActiveTime(time(NULL));
        pRequest->SetSendTime(GSKRequest::GetCurrentTimeMsec());
        PushToSendedMap(pRequest);
    }
}

其中,包头拼接及序列化的GetEncodeBody的过程如下:

std::string GSKPacket::encode ( )
{
    m_strBuffer.clear();
    packIn8( m_strBuffer, STX );

    if (m_wCmd == CMD_HEART)
    {
        packIn16(m_strBuffer, m_wCmd); //wCommand

        //开始写位置
        packIn8(m_strBuffer, ETX);
        return m_strBuffer;
    }
    packIn16(m_strBuffer, m_wCmd); //wCommand
    uint32_t dwLen = m_strBody.size() + m_toUid.size() + m_fromUid.size() + m_routeId.size() + m_tranck.size() + GSK_PKG_HEAD_SIZE + 2;
    if (m_iVersion > 1)
    {
        dwLen = dwLen + 1;
    }
    
    packIn32( m_strBuffer, dwLen ); //dwLength

    packIn16( m_strBuffer, m_subCmd ); //subCommand
    //appid
    packIn64(m_strBuffer,m_appId);
    packIn32( m_strBuffer, (int)m_fromUid.size()); //m_fromUid的长度
    //发消息的人的id
    if( m_fromUid.size())
    {
        m_strBuffer.append(m_fromUid.c_str(),m_fromUid.size());
    }
    packIn32(m_strBuffer, (int)m_toUid.size()); //m_toUid的长度
    //收消息的人
    if(m_toUid.size())
    {
        m_strBuffer.append(m_toUid.c_str(),m_toUid.size());
    }

    packIn32(m_strBuffer, (int)m_routeId.size()); //路由id的长度
    //收消息的人
    if(m_routeId.size())
    {
        m_strBuffer.append(m_routeId.c_str(),m_routeId.size());
    }

    packIn32(m_strBuffer, (int)m_tranck.size());
    if (m_tranck.size())
    {
        m_strBuffer.append(m_tranck.c_str(), m_tranck.size());
    }

    //1:iOS 2:android 3:PC设备类型
    packIn16( m_strBuffer, getWClt());
    //ip
    packIn64(m_strBuffer, m_ip);
    //端口号
    packIn32(m_strBuffer, m_port);
    //确认关键字
    packIn64(m_strBuffer,m_seqid);
    //最后时间
    packIn64(m_strBuffer,m_lastTime);
    //版本号
    packIn32( m_strBuffer, m_iVersion);
    if (m_iVersion > 1)
    {
        packIn8(m_strBuffer, (unsigned char)m_iEncryptType);
    }
    //序列化消息体
    if(m_strBody.size() > 0)
    {
        m_strBuffer.append(m_strBody);
    }
    //开始写位置
     packIn8( m_strBuffer, ETX);

    return m_strBuffer;
}

发送消息的过程如下:

int GSKSocket::TrySend(const char* szBuf, int iBufLen , int iTimeout )
{
    int iRet = 0;
    fd_set stFdSet;
    FD_ZERO(&stFdSet);
    FD_SET(m_iSocket, &stFdSet);

    struct timeval stTimeOut;
    stTimeOut.tv_sec = iTimeout;
    stTimeOut.tv_usec = 0;

    int iSendBytes = 0;
    while (iSendBytes < iBufLen)
    {
        if ((iRet = select(m_iSocket + 1, NULL, &stFdSet, NULL, &stTimeOut)) == -1)
        {
            //select socket for write failed
            return -21; // 返回值无意义,负值即可
        }
        else if (iRet == 0)
        {
            //select socket for write timeout
            errno = ETIMEDOUT;
            return -22; // 返回值无意义,负值即可
        }

        if ((iRet = send(m_iSocket, szBuf+iSendBytes, iBufLen-iSendBytes, 0)) == -1)
        {
            if (errno == EWOULDBLOCK || errno == EINPROGRESS)
            {
                continue;
            }
            else
            {
                //send data failed
                return -23; // 返回值无意义,负值即可
            }
        }
        iSendBytes += iRet;
    }

    return iSendBytes;
}

注意发送线程退出的时,需要清理发送队列

void GSKCNet::ClearSendQueue()
{
    std::queue<GSKRequest*> tmpQ;
    SwapSendQueue(tmpQ);

    while (!tmpQ.empty())
    {
        GSKRequest* pRequest = tmpQ.front();
        tmpQ.pop();
        if (pRequest)
        {
            handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), 
                pRequest->GetReqCmd(), pRequest->getPacket()->getWClt(),
                pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
            delete pRequest;
        }
    }
}

相关文章

网友评论

      本文标题:IM客户端开发(2)——发送线程

      本文链接:https://www.haomeiwen.com/subject/bfbdgctx.html