TCP分包
在发生一个消息或一帧数据时,通过一定的处理,使接收方能够从字节流中识别并截取一个个消息。
对于短连接的TCP服务,分包不是问题:
- 发送方主动关闭连接,就代表消息发送完毕;
- 接收方
read
的字节数变为0,代表消息接收完毕;
对于长连接的TCP服务,分包则需要从消息本身去着手:
- 可以使消息的长度保持固定;
- 可以让消息使用特定的字符或字符串去作为消息边界,如
"\r\n"
; - 可以在消息的头部加上长度字段;
- 利用消息自身的格式进行匹配,如JSON文件的
{...}
匹配机制
数据到达VS消息到达
muduo::net::buffer
无法设置回调函数的触发条件,意思也就是说,buffer
只能判断是否有数据到达,但具体的消息是否到达完整,则还需要进一步处理。为此,可以增加一层数据编解码器LengthHeaderCodec
用于把数据的parse任务进行抽离,这样就简化了代码,并且使得用户只需要关心消息是否已经到达,到达了就调用回调函数。换而言之,回调函数onMessage()
变为了onStringMessage()
,编写服务端和客户端的程序就只需要考虑消息到达后的发送,而不必考虑字节流的parse,因为LengthHeaderCodec
已经为我们提供了这一层服务。具体的调用过程为:
消息到达
=> Codec::OnMessage() # 注册为server_的消息到达回调函数
=> Codec::MessageCallback_() # 由server_.onStringMessage()注册
OK,服务端和客户端都可以将这个技巧进行运用了,甚至可以使用相同的编解码器对象以节省空间。
多线程初步(客户端)
客户端使用两个线程,一个用于读取键盘输入,另一个则用于打印从服务端接收到的数据。
实际上手
- 编解码器
LengthHeaderCodec
#ifndef LENGTHHEADERCODEC_H
#define LENGTHHEADERCODEC_H
#include <muduo/net/Buffer.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/base/Logging.h>
#include <muduo/net/Endian.h>
class LengthHeaderCodec: muduo::noncopyable // this class is noncopyable
{
private:
using MessageCallbackFunc = std::function<void (const muduo::net::TcpConnectionPtr &, const muduo::string &, muduo::Timestamp time)>;
const int32_t nHeader = sizeof (int32_t); // length of header;
MessageCallbackFunc messageCallback_; // this is a r-value;
public:
explicit LengthHeaderCodec(const MessageCallbackFunc &func): messageCallback_(func)
{} // 此处的入参类型必须为const引用,因为messageCallback_是一个右值
void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp time){
// LOG_INFO << " buf->readableBytes() is " << buf->readableBytes(); // test, to show if data is accepted;
// above LOG_INFO is OK, so bug comes below;
while(buf->readableBytes() >= nHeader){ // could have data, but completeness can't be guaranteed
// const void *dataptr = buf->peek();
// int32_t dataSz = *static_cast<const int32_t *>(dataptr);
// const int32_t len = muduo::net::sockets::networkToHost32(dataSz);
const int32_t len = buf->peekInt32(); // this function contains networkToHost32
if (len<0 || len>65536){
LOG_ERROR << "Invalid length " << len;
conn->shutdown();
break;
} else if(buf->readableBytes() >= nHeader+len){
buf->retrieve(nHeader);
muduo::string message_(buf->peek(), len);
// only @message_ is used here
messageCallback_(conn, message_, time); // string message callback, registered by server and client
buf->retrieve(len);
} else {
break; // message is not complete;
}
}
}
void send(muduo::net::TcpConnection *conn, const muduo::StringPiece &data){
muduo::net::Buffer buf;
buf.append(data.data(), data.size()); // convert to type: muduo::net::Buffer
int32_t len = static_cast<int32_t>(data.size());
// int32_t dataSz = muduo::net::sockets::hostToNetwork32(len);
// buf.prepend(&dataSz, sizeof dataSz);
buf.prependInt32(len); // this function contains hostToNetwork32
conn->send(&buf);
}
};
#endif /* LENGTHHEADERCODEC_H */
LengthHeaderCodec
一些注意点:
- 构造函数处的入参必须为常量引用(当然,值引用也是可以的),原因见注释
-
onMessage()
函数的入参conn
不能加const
限定符,因为TcpConnection
类的send()
函数只有非const
版本。(那为什么不将conn
直接声明为TcpConnectionPtr
呢?) - 字节流的编码和解码:无论具体用什么方式,都要确保双方能够互通,比如发送方将头部编为了网络传输模式,则接收方必须将其重新反转为原格式
- 服务端
ChatServerMyself
#ifndef CHAT_SERVER_MYSELF_H
#define CHAT_SERVER_MYSELF_H
#include <muduo/net/TcpConnection.h>
#include <muduo/net/TcpServer.h>
#include <muduo/net/Buffer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>
#include <set>
#include "LengthHeaderCodec.h"
using namespace std::placeholders;
class ChatServerMyself // 为了方便直接将main函数写在了该文件内
{
private:
using connSet = std::set<muduo::net::TcpConnectionPtr>;
LengthHeaderCodec codec_;
muduo::net::TcpServer server_;
connSet connects_;
public:
ChatServerMyself(muduo::net::EventLoop *loop, muduo::net::InetAddress addr):
server_(loop, addr, "CHAT_SERVER_MYSELF"),
codec_(std::bind(&ChatServerMyself::onStringMessage, this, _1, _2, _3))
{
server_.setConnectionCallback(std::bind(&ChatServerMyself::onConnection, this, _1));
server_.setMessageCallback(std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
}
void start();
void onConnection(const muduo::net::TcpConnectionPtr &conn);
void onStringMessage(const muduo::net::TcpConnectionPtr &conn, const muduo::string &message, muduo::Timestamp time);
};
#endif /* CHAT_SERVER_MYSELF_H */
void ChatServerMyself::start(){
server_.start();
}
void ChatServerMyself::onConnection(const muduo::net::TcpConnectionPtr &conn){
LOG_INFO << "ASIO Server: " << conn->peerAddress().toIpPort() \
<< " -> " << conn->localAddress().toIpPort() << " is " \
<< (conn->connected() ? "UP" : "DOWN");
if(conn->connected()){
connects_.insert(conn);
} else {
connects_.erase(conn);
}
}
void ChatServerMyself::onStringMessage(const muduo::net::TcpConnectionPtr &, const muduo::string &message, muduo::Timestamp ){
for(connSet::iterator itr = connects_.begin(); itr != connects_.end(); ++itr){ // send data to all clients it connects;
codec_.send(itr->get(), message);
}
}
int main(int argc, char *argv[])
{
LOG_INFO << "pid = " << getpid();
if(argc > 1){
muduo::net::EventLoop loop;
uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
muduo::net::InetAddress addr(port);
ChatServerMyself server(&loop, addr);
server.start();
loop.loop();
} else {
printf("Usage: %s <port>\n", argv[0]);
}
return 0;
}
ChatServerMyself
一些注意点:
- 探测到有新连接时应该做的事:将新的连接添加到自己的内部
set
变量中; - 有数据达到时应该做的事:将完整消息传递给与它有连接的所有客户端;
- 以编解码器作为中间层:这意味着,我们将完整消息到达时对应的回调函数
onStringMessage()
注册到编解码器codec_
中,再将有数据到达时对应的回调函数设置为codec_
中的字节流处理函数onMessage()
;这样一来,至少在编写服务端程序的时候,我们就无需再考虑如何从字节流中截取完整的消息了;
- 客户端
ChatClientMyself
#ifndef CHAT_CLIENT_MYSELF_H
#define CHAT_CLIENT_MYSELF_H
#include "LengthHeaderCodec.h"
#include <muduo/net/TcpClient.h>
#include <muduo/base/Mutex.h>
#include <muduo/base/Logging.h>
#include <muduo/net/EventLoopThread.h>
#include <iostream>
using namespace muduo;
using namespace muduo::net;
class ChatClientMyself
{
private:
TcpClient client_;
TcpConnectionPtr conn_;
LengthHeaderCodec codec_;
mutable MutexLock mutex_;
public:
ChatClientMyself(EventLoop *loop, const InetAddress &addr):
client_(loop, addr, "CHAT_CLIENT_MYSELF"),
codec_(std::bind(&ChatClientMyself::onStringMessage, this, _1, _2, _3))
{
client_.setConnectionCallback(std::bind(&ChatClientMyself::onConnection, this, _1));
client_.setMessageCallback(std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
client_.enableRetry(); // FIXME: what is this about?
}
void connect(){
client_.connect();
}
void disconnect(){ // FIXME: is disconnect necessary?
client_.disconnect();
}
void onConnection(const TcpConnectionPtr &conn){
LOG_INFO << "Cient Server Connection: " \
<< conn->localAddress().toIpPort() << " -> " \
<< conn->peerAddress().toIpPort() << " is " \
<< (conn->connected() ? "UP" : "DOWN");
if(conn->connected()){
conn_ = conn;
} else {
conn_.reset();
}
}
void onStringMessage(const TcpConnectionPtr &, const string &message, Timestamp){
// std::cout is not thread-safe, so we use printf instead;
printf("<<< %s\n", message.c_str()); // print accepted data;
}
// send data from STDIN input;
void write(const string &message){
MutexLockGuard lock_(mutex_); // must lock mutex to protect shared_ptr;
if(conn_){
codec_.send(conn_.get(), message);
}
}
};
#endif /* CHAT_CLIENT_MYSELF_H */
int main(int argc, char *argv[])
{
LOG_INFO << "pid = " << getpid();
if(argc > 2){
EventLoopThread loopThread; // for net-IO
uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
InetAddress addr(argv[1], port);
ChatClientMyself client(loopThread.startLoop(), addr);
client.connect();
std::string input_line;
while(std::getline(std::cin, input_line)){
client.write(input_line);
}
client.disconnect();
CurrentThread::sleepUsec(1000 * 1000);
} else {
printf("Usage: %s <host-IP> <port>\n", argv[0]);
}
return 0;
}
CharClientMyself
一些注意点:
- 连接建立时应该做的事:保存该连接(使用
shared_ptr
类型的TcpConnectionPtr
,RAII模式) - 有数据到达时应该做的事:将其打印在标准输出上
- 多线程:mainThread线程用于读取键盘输入,并将之发送;而loopThread负责进行网络IO
网友评论