美文网首页
(6)链接与消息封装(Reactor部分)【Lars-基于C++

(6)链接与消息封装(Reactor部分)【Lars-基于C++

作者: 刘丹冰Aceld | 来源:发表于2019-09-18 11:59 被阅读0次

    【Lars教程目录】

    Lars源代码
    https://github.com/aceld/Lars


    【Lars系统概述】
    第1章-概述
    第2章-项目目录构建


    【Lars系统之Reactor模型服务器框架模块】
    第1章-项目结构与V0.1雏形
    第2章-内存管理与Buffer封装
    第3章-事件触发EventLoop
    第4章-链接与消息封装
    第5章-Client客户端模型
    第6章-连接管理及限制
    第7章-消息业务路由分发机制
    第8章-链接创建/销毁Hook机制
    第9章-消息任务队列与线程池
    第10章-配置文件读写功能
    第11章-udp服务与客户端
    第12章-数据传输协议protocol buffer
    第13章-QPS性能测试
    第14章-异步消息任务机制
    第15章-链接属性设置功能


    【Lars系统之DNSService模块】
    第1章-Lars-dns简介
    第2章-数据库创建
    第3章-项目目录结构及环境构建
    第4章-Route结构的定义
    第5章-获取Route信息
    第6章-Route订阅模式
    第7章-Backend Thread实时监控


    【Lars系统之Report Service模块】
    第1章-项目概述-数据表及proto3协议定义
    第2章-获取report上报数据
    第3章-存储线程池及消息队列


    【Lars系统之LoadBalance Agent模块】
    第1章-项目概述及构建
    第2章-主模块业务结构搭建
    第3章-Report与Dns Client设计与实现
    第4章-负载均衡模块基础设计
    第5章-负载均衡获取Host主机信息API
    第6章-负载均衡上报Host主机信息API
    第7章-过期窗口清理与过载超时(V0.5)
    第8章-定期拉取最新路由信息(V0.6)
    第9章-负载均衡获取Route信息API(0.7)
    第10章-API初始化接口(V0.8)
    第11章-Lars Agent性能测试工具
    第12章- Lars启动工具脚本


    5) tcp链接与Message消息封装

    ​ 好了,现在我们来将服务器的连接做一个简单的封装,在这之前,我们要将我我们所发的数据做一个规定,采用TLV的格式,来进行封装。目的是解决TCP传输的粘包问题。

    5.1 Message消息封装

    7-TCP粘包问题-拆包封包过程.jpeg

    ​ 先创建一个message.h头文件

    lars_reactor/include/message.h

    #pragma once
    
    //解决tcp粘包问题的消息头
    struct msg_head
    {
        int msgid;
        int msglen;
    };
    
    //消息头的二进制长度,固定数
    #define MESSAGE_HEAD_LEN 8
    
    //消息头+消息体的最大长度限制
    #define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)
    

    ​ 接下来我们每次在server和 client之间传递数据的时候,都发送这种数据格式的头再加上后面的数据内容即可。

    5.2 创建一个tcp_conn连接类

    lars_reactor/include/tcp_conn.h

    #pragma once
    
    #include "reactor_buf.h"
    #include "event_loop.h"
    
    //一个tcp的连接信息
    class tcp_conn
    {
    public:
        //初始化tcp_conn
        tcp_conn(int connfd, event_loop *loop);
    
        //处理读业务
        void do_read();
    
        //处理写业务
        void do_write();
    
        //销毁tcp_conn
        void clean_conn();
    
        //发送消息的方法
        int send_message(const char *data, int msglen, int msgid);
    
    private:
        //当前链接的fd
        int _connfd;
        //该连接归属的event_poll
        event_loop *_loop;
        //输出buf
        output_buf obuf;     
        //输入buf
        input_buf ibuf;
    };
    

    简单说明一下里面的成员和方法:

    成员:

    _connfd:server刚刚accept成功的套接字

    _loop:当前链接所绑定的事件触发句柄.

    obuf:链接输出缓冲,向对端写数据

    ibuf:链接输入缓冲,从对端读数据

    方法

    tcp_client():构造,主要在里面实现初始化及创建链接链接的connect过程。

    do_read():读数据处理业务,主要是EPOLLIN事件触发。

    do_write():写数据处理业务,主要是EPOLLOUT事件触发。

    clean_conn():清空链接资源。

    send_message():将消息打包成TLV格式发送给对端。

    ​ 接下来,实现以下tcp_conn类.

    lars_reactor/src/tcp_conn.cpp

    #include <unistd.h>
    #include <fcntl.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <netinet/tcp.h>
    #include <string.h>
    
    #include "tcp_conn.h"
    #include "message.h"
    
    
    //回显业务
    void callback_busi(const char *data, uint32_t len, int msgid, void *args, tcp_conn *conn)
    {
        conn->send_message(data, len, msgid);
    }
    
    
    //连接的读事件回调
    static void conn_rd_callback(event_loop *loop, int fd, void *args)
    {
        tcp_conn *conn = (tcp_conn*)args;
        conn->do_read();
    }
    //连接的写事件回调
    static void conn_wt_callback(event_loop *loop, int fd, void *args)
    {
        tcp_conn *conn = (tcp_conn*)args;
        conn->do_write();
    }
    
    //初始化tcp_conn
    tcp_conn::tcp_conn(int connfd, event_loop *loop)
    {
        _connfd = connfd;
        _loop = loop;
        //1. 将connfd设置成非阻塞状态
        int flag = fcntl(_connfd, F_GETFL, 0);
        fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);
    
        //2. 设置TCP_NODELAY禁止做读写缓存,降低小包延迟
        int op = 1;
        setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h
    
        //3. 将该链接的读事件让event_loop监控 
        _loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);
    
        //4 将该链接集成到对应的tcp_server中
        //TODO
    }
    
    //处理读业务
    void tcp_conn::do_read()
    {
        //1. 从套接字读取数据
        int ret = ibuf.read_data(_connfd);
        if (ret == -1) {
            fprintf(stderr, "read data from socket\n");
            this->clean_conn();
            return ;
        }
        else if ( ret == 0) {
            //对端正常关闭
            printf("connection closed by peer\n");
            clean_conn();
            return ;
        }
    
        //2. 解析msg_head数据    
        msg_head head;    
        
        //[这里用while,可能一次性读取多个完整包过来]
        while (ibuf.length() >= MESSAGE_HEAD_LEN)  {
            //2.1 读取msg_head头部,固定长度MESSAGE_HEAD_LEN    
            memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);
            if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {
                fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);
                this->clean_conn();
                break;
            }
            if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {
                //缓存buf中剩余的数据,小于实际上应该接受的数据
                //说明是一个不完整的包,应该抛弃
                break;
            }
    
            //2.2 再根据头长度读取数据体,然后针对数据体处理 业务
            //TODO 添加包路由模式
            
            //头部处理完了,往后偏移MESSAGE_HEAD_LEN长度
            ibuf.pop(MESSAGE_HEAD_LEN);
            
            //处理ibuf.data()业务数据
            printf("read data: %s\n", ibuf.data());
    
            //回显业务
            callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);
    
            //消息体处理完了,往后便宜msglen长度
            ibuf.pop(head.msglen);
        }
    
        ibuf.adjust();
        
        return ;
    }
    
    //处理写业务
    void tcp_conn::do_write()
    {
        //do_write是触发玩event事件要处理的事情,
        //应该是直接将out_buf力度数据io写会对方客户端 
        //而不是在这里组装一个message再发
        //组装message的过程应该是主动调用
        
        //只要obuf中有数据就写
        while (obuf.length()) {
            int ret = obuf.write2fd(_connfd);
            if (ret == -1) {
                fprintf(stderr, "write2fd error, close conn!\n");
                this->clean_conn();
                return ;
            }
            if (ret == 0) {
                //不是错误,仅返回0表示不可继续写
                break;
            }
        }
    
        if (obuf.length() == 0) {
            //数据已经全部写完,将_connfd的写事件取消掉
            _loop->del_io_event(_connfd, EPOLLOUT);
        }
    
        return ;
    }
    
    //发送消息的方法
    int tcp_conn::send_message(const char *data, int msglen, int msgid)
    {
        printf("server send_message: %s:%d, msgid = %d\n", data, msglen, msgid);
        bool active_epollout = false; 
        if(obuf.length() == 0) {
            //如果现在已经数据都发送完了,那么是一定要激活写事件的
            //如果有数据,说明数据还没有完全写完到对端,那么没必要再激活等写完再激活
            active_epollout = true;
        }
    
        //1 先封装message消息头
        msg_head head;
        head.msgid = msgid;
        head.msglen = msglen;
         
        //1.1 写消息头
        int ret = obuf.send_data((const char *)&head, MESSAGE_HEAD_LEN);
        if (ret != 0) {
            fprintf(stderr, "send head error\n");
            return -1;
        }
    
        //1.2 写消息体
        ret = obuf.send_data(data, msglen);
        if (ret != 0) {
            //如果写消息体失败,那就回滚将消息头的发送也取消
            obuf.pop(MESSAGE_HEAD_LEN);
            return -1;
        }
    
        if (active_epollout == true) {
            //2. 激活EPOLLOUT写事件
            _loop->add_io_event(_connfd, conn_wt_callback, EPOLLOUT, this);
        }
    
    
        return 0;
    }
    
    //销毁tcp_conn
    void tcp_conn::clean_conn()
    {
        //链接清理工作
        //1 将该链接从tcp_server摘除掉    
        //TODO 
        //2 将该链接从event_loop中摘除
        _loop->del_io_event(_connfd);
        //3 buf清空
        ibuf.clear(); 
        obuf.clear();
        //4 关闭原始套接字
        int fd = _connfd;
        _connfd = -1;
        close(fd);
    }
    

    ​ 具体每个方法的实现,都很清晰。其中conn_rd_callback()conn_wt_callback()是注册读写事件的回调函数,设置为static是因为函数类型没有this指针。在里面分别再调用do_read()do_write()方法。

    5.3 修正tcp_server对accept之后的处理方法

    lars_reactor/src/tcp_server.cpp

    //...
    
    //开始提供创建链接服务
    void tcp_server::do_accept()
    {
        int connfd;    
        while(true) {
            //accept与客户端创建链接
            printf("begin accept\n");
            connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
            if (connfd == -1) {
                if (errno == EINTR) {
                    fprintf(stderr, "accept errno=EINTR\n");
                    continue;
                }
                else if (errno == EMFILE) {
                    //建立链接过多,资源不够
                    fprintf(stderr, "accept errno=EMFILE\n");
                }
                else if (errno == EAGAIN) {
                    fprintf(stderr, "accept errno=EAGAIN\n");
                    break;
                }
                else {
                    fprintf(stderr, "accept error");
                    exit(1);
                }
            }
            else {
                //accept succ!
                // ============= 将之前的触发回调的删掉,改成如下====
                tcp_conn *conn = new tcp_conn(connfd, _loop);
                if (conn == NULL) {
                    fprintf(stderr, "new tcp_conn error\n");
                    exit(1);
                }
                // ============================================
                printf("get new connection succ!\n");
                break;
            }
        }
    }
    
    //...
    

    ​ 这样,每次accept成功之后,创建一个与当前客户端套接字绑定的tcp_conn对象。在构造里就完成了基本的对于EPOLLIN事件的监听和回调动作.

    ​ 现在可以先编译一下,保证没有语法错误,但是如果想测试,就不能够使用nc指令测试了,因为现在服务端只能够接收我们自定义的TLV格式的报文。那么我们需要自己写一个客户端来完成基本的测试。


    关于作者:

    作者:Aceld(刘丹冰)

    mail: danbing.at@gmail.com
    github: https://github.com/aceld
    原创书籍gitbook: http://legacy.gitbook.com/@aceld

    原创声明:未经作者允许请勿转载, 如果转载请注明出处

    相关文章

      网友评论

          本文标题:(6)链接与消息封装(Reactor部分)【Lars-基于C++

          本文链接:https://www.haomeiwen.com/subject/qeohuctx.html