1、 建立连接成功后,上层app会立即发送登录请求,登录成功之后就可以发送消息了,发送登录请求和发送消息都是调用底层的Request接口,过程如下:
Java_com_vkansee_jnidemo_jniapi_GIMNetApi_Request(JNIEnv *_env, jclass _thiz, jint _cmd,
jint _subCmd, jstring _toUid, jstring _routeId,
jint _timeOut, jint _reqeustId, jbyteArray _msg) {
jsize len = _env->GetArrayLength(_msg);
jbyte *pMsg = (jbyte*)_env->GetByteArrayElements(_msg, 0);
char* buffer = (char*)malloc(len+1);
memcpy(buffer, pMsg, len);
buffer[len] = '\0';
int iRet = GSKCNet::sharedInstance()->Request(iCmd , iSubCmd, strToUid, strRouteId,
iTimeout, buffer, (int)len, [=](__GSKNetArg stArg) {
(g_jvm)->GetEnv((void **) &env, g_version); //注意由于回调是在另外一个线程中调用的,所以需要先把jvm attach到调用当前回调的线程
status = g_jvm->AttachCurrentThread(&env, NULL);
int iCode = stArg.iCode;
int iSeq = stArg.iSeq;
int iCmd = stArg.iCmd;
env->CallStaticVoidMethod(cls, mid, iCode, iSeq, iCmd, iRequestId, jarray);
g_jvm->DetachCurrentThread(); //调用结束后,需要把jvm detach到调用当前回调的线程
});
free(buffer);
_env->ReleaseByteArrayElements(_msg, pMsg, 0);
return iRet;
}
Request先根据请求将消息封装成一个消息包Packet,然后再将这个Packet和相应的回调callback封装到一个请求里面,最后将这个请求放到发送队列里面
unsigned long long GSKCNet::Request(int cmd, int subCmd, const std::string &toUid, const std::string &routeId,
int iTimeout, const char *pMsg, int iLen, GSKCNetCallbackType callback)
{
GSKPacket *pPacket = createPacket(cmd,subCmd,toUid);
pPacket->setEncryptType(m_encrypttype);
pPacket->setBody(pMsg, iLen);
pPacket->setRouteId(routeId);
GSKRequest* pRequest = new GSKRequest(pPacket, iTimeout, callback);
PushToSendQueue(pRequest);
return pPacket->getSeqid();
}
消息包的封装如下:
GSKPacket *GSKCNet::createPacket(int cmd, int subcmd, const std::string &toUid)
{
//消息包
GSKPacket *pPacket = GSKPacket::create();
pPacket->setCmd(cmd);
pPacket->setSubCmd(subcmd);
pPacket->setFromUid(m_sendUid);
pPacket->setToUid(toUid);
if (cmd == CMD_HEART)
{
pPacket->setSeqid(0);
}
else{
pPacket->setSeqid(createPageId());
}
pPacket->setVersion(m_appVersion);
pPacket->setAppId(m_appId);
return pPacket;
}
其中每个消息包有个序列号Seqid,它是随时间递增的
unsigned long long GSKCNet::createPageId()
{
std::lock_guard<std::mutex> loc(m_refcount);
static volatile unsigned long long id = GSKRequest::GetCurrentTimeMsec();
if (id == 0xFFFFFFFFFFFFFFFF)
{
id = GSKRequest::GetCurrentTimeMsec();
}
else
{
++id;
}
return id;
}
可以看到这个Id基本能保证唯一性
Request会把这个请求放到一个发送队列里面std::queue< GSKRequest* > m_sendQueue
void GSKCNet::PushToSendQueue(GSKRequest *req)
{
std::lock_guard<std::mutex> loc(m_lock);
if (SUBCMD_LOGIN_MSG == req->GetReqCmd())
{
bool isExist = false;
std::queue<GSKRequest*> tmpQ(m_sendQueue);
while (!tmpQ.empty())
{
GSKRequest* pRequest = tmpQ.front();
tmpQ.pop();
if (pRequest->GetReqCmd() == SUBCMD_LOGIN_MSG)
{
isExist = true;
break;
}
}
if (isExit)
{
delete req;
req = NULL;
m_sendcv.notify_all();
}
else
{
m_sendQueue.push(req);
m_sendcv.notify_all();
}
}
m_sendQueue.push(req);
m_sendcv.notify_all();
}
下面看下发送线程如何处理这个发送队列
首先发送线程是在每次连接完成后启动
void GSKCNet::Start()
{
//开启发送线程
m_workThread = std::thread(std::bind( &GSKCNet::Loop, this));
m_workThread.detach();
//开启接收线程
m_recvThread = std::thread(std::bind(&GSKCNet::recvLoop, this));
m_recvThread.detach();
//开启回调处理线程
m_callbackThread = std::thread(std::bind(&GSKCNet::callbackLoop, this));
m_callbackThread.detach();
}
发送线程不断地检查发送队列是否为空,如果为空则阻塞,如果不为空,则发送一条消息
void GSKCNet::Loop()
{
SetWorking(true);
SetExitSendLoop(false);
std::mutex sendconlock;
std::unique_lock < std::mutex > lck(sendconlock);
//线程正在运行
while (GetWorking())
{
if (SendQueueIsEmpty())
{
if (GetWorking())
{
m_sendcv.wait(lck);
}
else
{
break;
}
}
if (GetWorking())
{
//检查消息发送队列,有消息就发送
sendMsg();
}
}
ClearSendQueue();
SetIsLogged(false);
SetExitSendLoop(true);
}
发送消息的过程分为如下几步:
1) 从消息队列获取到最前面的一条消息
2) 如果消息体需要加密,对消息体进行加密
3) 对消息体进行拼接并且序列化
4) 发送消息
a. 消息发送成功
将已发送的请求放入一个已发送的map中 unordered_map< unsigned long long, GSKRequest* > m_hasSendedMap
b. 消息发送失败
1.1 查看是否正在尝试连接
如果正在重连
等待连接1秒
若连接成功,再次发送消息
若发送成功,加入已发送的map中
若发送失败,调用发送失败回调
1.2 没有重连
尝试重连
若连接成功,
再次发送消息
若发送成功,加入已发送的map中
若发送失败,调用发送失败回调
若连接失败,
查看是否正在尝试连接
如果正在重连
等待连接1秒
若连接成功,再次发送消息
若发送成功,加入已发送的map中
若发送失败,调用发送失败回调
代码逻辑如下:
void GSKCNet::sendMsg()
{
//发送队列有消息
GSKRequest* pRequest = GetFrontRequest();
std::string outBody;
Encrypt(pRequest->GetBody(), pRequest->getPacket()->getRouteId(), outBody);
pRequest->getPacket()->setBody(outBody);
//包头拼接完成并且包体已经序列化
std::string msg = pRequest->GetEncodeBody();
int iRet = m_pGSKSocket->Send(msg.data(), (int)msg.size());
//发送失败
if(iRet < 0)
{
if (GetTryReconnected())
{
//等待连接完成,然后发送消息
bool tryRec = GetTryReconnected();
int num = 0;
while (tryRec && GetWorking())
{
glodon::sleep(10);
tryRec = GetTryReconnected();
++num;
if (num > 100)
{
break;
}
}
if (!GetWorking())
{
handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), pRequest->GetReqCmd(),
pRequest->getPacket()->getWClt(),pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
delete pRequest;
pRequest = nullptr;
return;
}
int iRets = m_pGSKSocket->Send(msg.data(), (int)msg.size());
if (iRets < 0)
{
handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), pRequest->GetReqCmd(),
pRequest->getPacket()->getWClt(), pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
delete pRequest;
pRequest = nullptr;
}
else
{
SetLastActiveTime(time(NULL));
pRequest->SetSendTime(GSKRequest::GetCurrentTimeMsec());
PushToSendedMap(pRequest);
}
}
else
{
if (TryConnectServer(1, SEND_FAILED_CNETWORK))
{
int iRets = m_pGSKSocket->Send(msg.data(), (int)msg.size());
if (iRets < 0)
{
LOGI_GSKCNET_INFO("second sendMsg failed iRets = %d", iRets);
handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), pRequest->GetReqCmd(), pRequest->getPacket()->getWClt(),
pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
delete pRequest;
pRequest = nullptr;
}
else
{
SetLastActiveTime(time(NULL));
pRequest->SetSendTime(GSKRequest::GetCurrentTimeMsec());
PushToSendedMap(pRequest);
}
}
else
{
if (GetTryReconnected())
{
bool tryRec = GetTryReconnected();
//等待连接完成,然后发送消息
int num = 0;
while (tryRec && GetWorking())
{
glodon::sleep(10);
tryRec = GetTryReconnected();
++num;
if (num > 100)
{
break;
}
}
if (!GetWorking())
{
handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), pRequest->GetReqCmd(),
pRequest->getPacket()->getWClt(), pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
delete pRequest;
pRequest = nullptr;
return;
}
int iRets = m_pGSKSocket->Send(msg.data(), (int)msg.size());
if (iRets < 0)
{
handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), pRequest->GetReqCmd(),
pRequest->getPacket()->getWClt(), pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
delete pRequest;
pRequest = nullptr;
}
else
{
SetLastActiveTime(time(NULL));
pRequest->SetSendTime(GSKRequest::GetCurrentTimeMsec());
PushToSendedMap(pRequest);
}
}
else
{
handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(), pRequest->GetReqCmd(),
pRequest->getPacket()->getWClt(), pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
delete pRequest;
pRequest = nullptr;
}
}
}
}
else
{
SetLastActiveTime(time(NULL));
pRequest->SetSendTime(GSKRequest::GetCurrentTimeMsec());
PushToSendedMap(pRequest);
}
}
其中,包头拼接及序列化的GetEncodeBody的过程如下:
std::string GSKPacket::encode ( )
{
m_strBuffer.clear();
packIn8( m_strBuffer, STX );
if (m_wCmd == CMD_HEART)
{
packIn16(m_strBuffer, m_wCmd); //wCommand
//开始写位置
packIn8(m_strBuffer, ETX);
return m_strBuffer;
}
packIn16(m_strBuffer, m_wCmd); //wCommand
uint32_t dwLen = m_strBody.size() + m_toUid.size() + m_fromUid.size() + m_routeId.size() + m_tranck.size() + GSK_PKG_HEAD_SIZE + 2;
if (m_iVersion > 1)
{
dwLen = dwLen + 1;
}
packIn32( m_strBuffer, dwLen ); //dwLength
packIn16( m_strBuffer, m_subCmd ); //subCommand
//appid
packIn64(m_strBuffer,m_appId);
packIn32( m_strBuffer, (int)m_fromUid.size()); //m_fromUid的长度
//发消息的人的id
if( m_fromUid.size())
{
m_strBuffer.append(m_fromUid.c_str(),m_fromUid.size());
}
packIn32(m_strBuffer, (int)m_toUid.size()); //m_toUid的长度
//收消息的人
if(m_toUid.size())
{
m_strBuffer.append(m_toUid.c_str(),m_toUid.size());
}
packIn32(m_strBuffer, (int)m_routeId.size()); //路由id的长度
//收消息的人
if(m_routeId.size())
{
m_strBuffer.append(m_routeId.c_str(),m_routeId.size());
}
packIn32(m_strBuffer, (int)m_tranck.size());
if (m_tranck.size())
{
m_strBuffer.append(m_tranck.c_str(), m_tranck.size());
}
//1:iOS 2:android 3:PC设备类型
packIn16( m_strBuffer, getWClt());
//ip
packIn64(m_strBuffer, m_ip);
//端口号
packIn32(m_strBuffer, m_port);
//确认关键字
packIn64(m_strBuffer,m_seqid);
//最后时间
packIn64(m_strBuffer,m_lastTime);
//版本号
packIn32( m_strBuffer, m_iVersion);
if (m_iVersion > 1)
{
packIn8(m_strBuffer, (unsigned char)m_iEncryptType);
}
//序列化消息体
if(m_strBody.size() > 0)
{
m_strBuffer.append(m_strBody);
}
//开始写位置
packIn8( m_strBuffer, ETX);
return m_strBuffer;
}
发送消息的过程如下:
int GSKSocket::TrySend(const char* szBuf, int iBufLen , int iTimeout )
{
int iRet = 0;
fd_set stFdSet;
FD_ZERO(&stFdSet);
FD_SET(m_iSocket, &stFdSet);
struct timeval stTimeOut;
stTimeOut.tv_sec = iTimeout;
stTimeOut.tv_usec = 0;
int iSendBytes = 0;
while (iSendBytes < iBufLen)
{
if ((iRet = select(m_iSocket + 1, NULL, &stFdSet, NULL, &stTimeOut)) == -1)
{
//select socket for write failed
return -21; // 返回值无意义,负值即可
}
else if (iRet == 0)
{
//select socket for write timeout
errno = ETIMEDOUT;
return -22; // 返回值无意义,负值即可
}
if ((iRet = send(m_iSocket, szBuf+iSendBytes, iBufLen-iSendBytes, 0)) == -1)
{
if (errno == EWOULDBLOCK || errno == EINPROGRESS)
{
continue;
}
else
{
//send data failed
return -23; // 返回值无意义,负值即可
}
}
iSendBytes += iRet;
}
return iSendBytes;
}
注意发送线程退出的时,需要清理发送队列
void GSKCNet::ClearSendQueue()
{
std::queue<GSKRequest*> tmpQ;
SwapSendQueue(tmpQ);
while (!tmpQ.empty())
{
GSKRequest* pRequest = tmpQ.front();
tmpQ.pop();
if (pRequest)
{
handleMsgCallBack(GSKNET_SEND_MSG_FAILED_CODE, pRequest->GetSequence(),
pRequest->GetReqCmd(), pRequest->getPacket()->getWClt(),
pRequest->GetSubCmd(), pRequest->GetBody(), pRequest->GetCallback());
delete pRequest;
}
}
}
网友评论