美文网首页
boost.asio聊天服务器(muduo)

boost.asio聊天服务器(muduo)

作者: 老杜振熙 | 来源:发表于2020-10-25 14:23 被阅读0次

TCP分包

在发生一个消息或一帧数据时,通过一定的处理,使接收方能够从字节流中识别并截取一个个消息。

对于短连接的TCP服务,分包不是问题:

  1. 发送方主动关闭连接,就代表消息发送完毕;
  2. 接收方read的字节数变为0,代表消息接收完毕;

对于长连接的TCP服务,分包则需要从消息本身去着手:

  1. 可以使消息的长度保持固定;
  2. 可以让消息使用特定的字符或字符串去作为消息边界,如"\r\n"
  3. 可以在消息的头部加上长度字段;
  4. 利用消息自身的格式进行匹配,如JSON文件的{...}匹配机制

数据到达VS消息到达

muduo::net::buffer无法设置回调函数的触发条件,意思也就是说,buffer只能判断是否有数据到达,但具体的消息是否到达完整,则还需要进一步处理。为此,可以增加一层数据编解码器LengthHeaderCodec用于把数据的parse任务进行抽离,这样就简化了代码,并且使得用户只需要关心消息是否已经到达,到达了就调用回调函数。换而言之,回调函数onMessage()变为了onStringMessage(),编写服务端和客户端的程序就只需要考虑消息到达后的发送,而不必考虑字节流的parse,因为LengthHeaderCodec已经为我们提供了这一层服务。具体的调用过程为:

消息到达
 => Codec::OnMessage() # 注册为server_的消息到达回调函数
 => Codec::MessageCallback_() # 由server_.onStringMessage()注册

OK,服务端和客户端都可以将这个技巧进行运用了,甚至可以使用相同的编解码器对象以节省空间。

多线程初步(客户端)

客户端使用两个线程,一个用于读取键盘输入,另一个则用于打印从服务端接收到的数据。

实际上手

  1. 编解码器LengthHeaderCodec
#ifndef LENGTHHEADERCODEC_H
#define LENGTHHEADERCODEC_H

#include <muduo/net/Buffer.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/base/Logging.h>
#include <muduo/net/Endian.h>

class LengthHeaderCodec: muduo::noncopyable // this class is noncopyable
{
private:
    using MessageCallbackFunc = std::function<void (const muduo::net::TcpConnectionPtr &, const muduo::string &, muduo::Timestamp time)>;
    
    const int32_t nHeader = sizeof (int32_t); // length of header;
    MessageCallbackFunc messageCallback_; // this is a r-value;

public:
    explicit LengthHeaderCodec(const MessageCallbackFunc &func): messageCallback_(func)
    {} // 此处的入参类型必须为const引用,因为messageCallback_是一个右值

    void onMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp time){
        // LOG_INFO << " buf->readableBytes() is " << buf->readableBytes(); // test, to show if data is accepted;
        // above LOG_INFO is OK, so bug comes below;
        while(buf->readableBytes() >= nHeader){ // could have data, but completeness can't be guaranteed
            // const void *dataptr = buf->peek();
            // int32_t dataSz = *static_cast<const int32_t *>(dataptr);
            // const int32_t len = muduo::net::sockets::networkToHost32(dataSz);
            const int32_t len = buf->peekInt32(); // this function contains networkToHost32
            if (len<0 || len>65536){
                LOG_ERROR << "Invalid length " << len;
                conn->shutdown();
                break;
            } else if(buf->readableBytes() >= nHeader+len){
                buf->retrieve(nHeader);
                muduo::string message_(buf->peek(), len);
                // only @message_ is used here
                messageCallback_(conn, message_, time); // string message callback, registered by server and client
                buf->retrieve(len);
            } else {
                break; // message is not complete;
            }
        }
    }

    void send(muduo::net::TcpConnection *conn, const muduo::StringPiece &data){
        muduo::net::Buffer buf; 
        buf.append(data.data(), data.size()); // convert to type: muduo::net::Buffer
        int32_t len = static_cast<int32_t>(data.size());
        // int32_t dataSz = muduo::net::sockets::hostToNetwork32(len);
        // buf.prepend(&dataSz, sizeof dataSz);
        buf.prependInt32(len); // this function contains hostToNetwork32
        conn->send(&buf);
    }
};

#endif /* LENGTHHEADERCODEC_H */

LengthHeaderCodec一些注意点:

  • 构造函数处的入参必须为常量引用(当然,值引用也是可以的),原因见注释
  • onMessage()函数的入参conn不能加const限定符,因为TcpConnection类的send()函数只有非const版本。(那为什么不将conn直接声明为TcpConnectionPtr呢?)
  • 字节流的编码和解码:无论具体用什么方式,都要确保双方能够互通,比如发送方将头部编为了网络传输模式,则接收方必须将其重新反转为原格式
  1. 服务端ChatServerMyself
#ifndef CHAT_SERVER_MYSELF_H
#define CHAT_SERVER_MYSELF_H

#include <muduo/net/TcpConnection.h>
#include <muduo/net/TcpServer.h>
#include <muduo/net/Buffer.h>
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>
#include <set>
#include "LengthHeaderCodec.h"

using namespace std::placeholders;

class ChatServerMyself // 为了方便直接将main函数写在了该文件内
{
private:
    using connSet = std::set<muduo::net::TcpConnectionPtr>;


    LengthHeaderCodec codec_;
    muduo::net::TcpServer server_;
    connSet connects_;
    

public:
    ChatServerMyself(muduo::net::EventLoop *loop, muduo::net::InetAddress addr):
        server_(loop, addr, "CHAT_SERVER_MYSELF"),
        codec_(std::bind(&ChatServerMyself::onStringMessage, this, _1, _2, _3))
        {
            server_.setConnectionCallback(std::bind(&ChatServerMyself::onConnection, this, _1));
            server_.setMessageCallback(std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
        }


    void start();

    void onConnection(const muduo::net::TcpConnectionPtr &conn);
    void onStringMessage(const muduo::net::TcpConnectionPtr &conn, const muduo::string &message, muduo::Timestamp time);

};

#endif /* CHAT_SERVER_MYSELF_H */

void ChatServerMyself::start(){
    server_.start();
}

void ChatServerMyself::onConnection(const muduo::net::TcpConnectionPtr &conn){
    LOG_INFO << "ASIO Server: " << conn->peerAddress().toIpPort() \
        << " -> " << conn->localAddress().toIpPort() << " is " \
        << (conn->connected() ? "UP" : "DOWN");

    if(conn->connected()){
        connects_.insert(conn);
    } else {
        connects_.erase(conn);
    }
}

void ChatServerMyself::onStringMessage(const muduo::net::TcpConnectionPtr &, const muduo::string &message, muduo::Timestamp ){
    for(connSet::iterator itr = connects_.begin(); itr != connects_.end(); ++itr){ // send data to all clients it connects;
        codec_.send(itr->get(), message);
    }
}

int main(int argc, char *argv[])
{
    LOG_INFO << "pid = " << getpid();
    if(argc > 1){
        muduo::net::EventLoop loop;
        uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
        muduo::net::InetAddress addr(port);
        ChatServerMyself server(&loop, addr);
        server.start();
        loop.loop();
    } else {
        printf("Usage: %s <port>\n", argv[0]);
    }
    return 0;
}

ChatServerMyself一些注意点:

  • 探测到有新连接时应该做的事:将新的连接添加到自己的内部set变量中;
  • 有数据达到时应该做的事:将完整消息传递给与它有连接的所有客户端;
  • 以编解码器作为中间层:这意味着,我们将完整消息到达时对应的回调函数onStringMessage()注册到编解码器codec_中,再将有数据到达时对应的回调函数设置为codec_中的字节流处理函数onMessage();这样一来,至少在编写服务端程序的时候,我们就无需再考虑如何从字节流中截取完整的消息了;
  1. 客户端ChatClientMyself
#ifndef CHAT_CLIENT_MYSELF_H
#define CHAT_CLIENT_MYSELF_H

#include "LengthHeaderCodec.h"
#include <muduo/net/TcpClient.h>
#include <muduo/base/Mutex.h>
#include <muduo/base/Logging.h>
#include <muduo/net/EventLoopThread.h>
#include <iostream>

using namespace muduo;
using namespace muduo::net;

class ChatClientMyself
{
private:
    TcpClient client_;
    TcpConnectionPtr conn_;
    LengthHeaderCodec codec_;
    mutable MutexLock mutex_;
    

public:
    ChatClientMyself(EventLoop *loop, const InetAddress &addr):
        client_(loop, addr, "CHAT_CLIENT_MYSELF"),
        codec_(std::bind(&ChatClientMyself::onStringMessage, this, _1, _2, _3))
    {
        client_.setConnectionCallback(std::bind(&ChatClientMyself::onConnection, this, _1));
        client_.setMessageCallback(std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
        client_.enableRetry(); // FIXME: what is this about?
    }

    void connect(){
        client_.connect();
    }

    void disconnect(){ // FIXME: is disconnect necessary?
        client_.disconnect();
    }

    void onConnection(const TcpConnectionPtr &conn){
        LOG_INFO << "Cient Server Connection: " \
            << conn->localAddress().toIpPort() << " -> " \
            << conn->peerAddress().toIpPort() << " is " \
            << (conn->connected() ? "UP" : "DOWN");
        if(conn->connected()){
            conn_ = conn;
        } else {
            conn_.reset();
        }
    }

    void onStringMessage(const TcpConnectionPtr &, const string &message, Timestamp){
        // std::cout is not thread-safe, so we use printf instead;
        printf("<<< %s\n", message.c_str()); // print accepted data;
    }

    // send data from STDIN input;
    void write(const string &message){
        MutexLockGuard lock_(mutex_); // must lock mutex to protect shared_ptr;
        if(conn_){
            codec_.send(conn_.get(), message);
        }
    }


};

#endif /* CHAT_CLIENT_MYSELF_H */

int main(int argc, char *argv[])
{
    LOG_INFO << "pid = " << getpid();
    if(argc > 2){
        EventLoopThread loopThread; // for net-IO
        uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
        InetAddress addr(argv[1], port);
        ChatClientMyself client(loopThread.startLoop(), addr);
        client.connect();

        std::string input_line;
        while(std::getline(std::cin, input_line)){
            client.write(input_line);
        }
        client.disconnect();
        CurrentThread::sleepUsec(1000 * 1000);
    } else {
        printf("Usage: %s <host-IP> <port>\n", argv[0]);
    }
    return 0;
}

CharClientMyself一些注意点:

  • 连接建立时应该做的事:保存该连接(使用shared_ptr类型的TcpConnectionPtr,RAII模式)
  • 有数据到达时应该做的事:将其打印在标准输出上
  • 多线程:mainThread线程用于读取键盘输入,并将之发送;而loopThread负责进行网络IO

演示结果

可以(自己和自己)聊天了.png

相关文章

  • boost.asio聊天服务器(muduo)

    TCP分包 在发生一个消息或一帧数据时,通过一定的处理,使接收方能够从字节流中识别并截取一个个消息。 对于短连接的...

  • 关于Boost.Asio

    目的 使用Boost.Asio连接/提供网络服务 了解Boost.Asio提供的功能 了解Boost.Asio的实...

  • muduo中的reactor

    muduo使用的是reactor模式,关于muduo的其他内容不做过多赘述。此文作为自己阅读muduo源码的笔记,...

  • muduo-base部分Timestamp

    muduo::copyable

  • muduo源码分析之Buffer

    这一次我们来分析下muduo中Buffer的作用,我们知道,当我们客户端向服务器发送数据时候,服务器就会读取我们发...

  • muduo源码分析之TcpServer模块

    这次我们开始muduo源代码的实际编写,首先我们知道muduo是LT模式,Reactor模式,下图为Reactor...

  • muduo多线程的处理

    这几天详细读了muduo的网络处理部分,发现多线程处理是整个框架的精华。muduo是基于one loop per ...

  • 向量IO —— muduo网络库Buffer设计理念总结

    注:本文为阅读muduo网络库源码Buffer部分的体悟 本文中Buffer一词均指代muduo网络库的class...

  • 关于muduo你应该知道的基本理论

    最近在看muduo,其中有些东西相对来说还是有些生涩。 服务器模型 c/s模型 p2p模型 两种高效的事件处理模式...

  • Boost asio 1.69.0 随笔

    C++的网络通信 基于C++的网络库有不少比如muduo,libevent等。muduo是感觉写的最容易理解的,b...

网友评论

      本文标题:boost.asio聊天服务器(muduo)

      本文链接:https://www.haomeiwen.com/subject/sonimktx.html