美文网首页
(7)客户端模型(Reactor部分)【Lars-基于C++负载

(7)客户端模型(Reactor部分)【Lars-基于C++负载

作者: 刘丹冰Aceld | 来源:发表于2019-09-18 12:00 被阅读0次

    【Lars教程目录】

    Lars源代码
    https://github.com/aceld/Lars


    【Lars系统概述】
    第1章-概述
    第2章-项目目录构建


    【Lars系统之Reactor模型服务器框架模块】
    第1章-项目结构与V0.1雏形
    第2章-内存管理与Buffer封装
    第3章-事件触发EventLoop
    第4章-链接与消息封装
    第5章-Client客户端模型
    第6章-连接管理及限制
    第7章-消息业务路由分发机制
    第8章-链接创建/销毁Hook机制
    第9章-消息任务队列与线程池
    第10章-配置文件读写功能
    第11章-udp服务与客户端
    第12章-数据传输协议protocol buffer
    第13章-QPS性能测试
    第14章-异步消息任务机制
    第15章-链接属性设置功能


    【Lars系统之DNSService模块】
    第1章-Lars-dns简介
    第2章-数据库创建
    第3章-项目目录结构及环境构建
    第4章-Route结构的定义
    第5章-获取Route信息
    第6章-Route订阅模式
    第7章-Backend Thread实时监控


    【Lars系统之Report Service模块】
    第1章-项目概述-数据表及proto3协议定义
    第2章-获取report上报数据
    第3章-存储线程池及消息队列


    【Lars系统之LoadBalance Agent模块】
    第1章-项目概述及构建
    第2章-主模块业务结构搭建
    第3章-Report与Dns Client设计与实现
    第4章-负载均衡模块基础设计
    第5章-负载均衡获取Host主机信息API
    第6章-负载均衡上报Host主机信息API
    第7章-过期窗口清理与过载超时(V0.5)
    第8章-定期拉取最新路由信息(V0.6)
    第9章-负载均衡获取Route信息API(0.7)
    第10章-API初始化接口(V0.8)
    第11章-Lars Agent性能测试工具
    第12章- Lars启动工具脚本


    6) tcp客户端触发模型

    ​ 我们可以给客户端添加触发模型。同时也提供一系列的接口供开发者写客户端应用程序来使用。

    6.1 tcp_client类设计

    lars_reactor/include/tcp_client.h

    #pragma once
    
    #include "io_buf.h"
    #include "event_loop.h"
    #include "message.h"
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    
    class tcp_client
    {
    public:
        //初始化客户端套接字
        tcp_client(event_loop *loop, const char *ip, unsigned short port,  const char *name);
    
        //发送message方法
        int send_message(const char *data, int msglen, int msgid);
    
        //创建链接
        void do_connect();
    
        //处理读业务
        int do_read();
        
        //处理写业务
        int do_write();
        
        //释放链接资源
        void clean_conn();
    
        ~tcp_client();
    
    
        //设置业务处理回调函数
        void set_msg_callback(msg_callback *msg_cb) 
        {
            this->_msg_callback = msg_cb;
        }
    
        bool connected; //链接是否创建成功
        //server端地址
        struct sockaddr_in _server_addr;
        io_buf _obuf;
        io_buf _ibuf;
    
    private:
        int _sockfd;
        socklen_t _addrlen;
    
        //客户端的事件处理机制
        event_loop* _loop;
    
        //当前客户端的名称 用户记录日志
        const char *_name;
    
        msg_callback *_msg_callback;
    };
    

    ​ 这里注意的是,tcp_client并不是tcp_server的一部分,而是单纯为写客户端提供的接口。所以这里也需要实现一套对读写事件处理的业务。 这里使用的读写缓冲是原始的io_buf,并不是服务器封装好的reactor_buf原因是后者是转为server做了一层封装,io_buf的基本方法比较全。

    关键成员:

    _sockfd:当前客户端套接字。

    _server_addr: 链接的服务端的IP地址。

    _loop: 客户端异步触发事件机制event_loop句柄。

    _msg_callback: 当前客户端处理服务端的回调业务。

    connected:是否已经成功connect服务端的标致。

    方法:

    tcp_client():构造函数,主要是在里面完成基本的套接字初始化及connect操作.

    do_connect():创建链接

    do_read():处理链接的读业务。

    do_write():处理链接的写业务。

    clean_conn():清空链接资源。

    6.2 创建链接

    lars_reactor/src/tcp_client.cpp

    tcp_client::tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name):
    _ibuf(4194304),
    _obuf(4194304)
    {
        _sockfd = -1;
        _msg_callback = NULL;
        _name = name;
        _loop = loop;
        
        bzero(&_server_addr, sizeof(_server_addr));
        
        _server_addr.sin_family = AF_INET; 
        inet_aton(ip, &_server_addr.sin_addr);
        _server_addr.sin_port = htons(port);
    
        _addrlen = sizeof(_server_addr);
    
        this->do_connect();
    }
    

    ​ 这里初始化tcp_client链接信息,然后调用do_connect()创建链接.

    lars_reactor/src/tcp_client.cpp

    //创建链接
    void tcp_client::do_connect()
    {
        if (_sockfd != -1) {
            close(_sockfd);
        }
    
        //创建套接字
        _sockfd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, IPPROTO_TCP);
        if (_sockfd == -1) {
            fprintf(stderr, "create tcp client socket error\n");
            exit(1);
        }
    
        int ret = connect(_sockfd, (const struct sockaddr*)&_server_addr, _addrlen);
        if (ret == 0) {
            //链接创建成功      
            connected = true; 
    
            //注册读回调
            _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
            //如果写缓冲去有数据,那么也需要触发写回调
            if (this->_obuf.length != 0) {
                _loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this);
            }
                
            printf("connect %s:%d succ!\n", inet_ntoa(_server_addr.sin_addr), ntohs(_server_addr.sin_port));
        }
        else {
            if(errno == EINPROGRESS) {
                //fd是非阻塞的,可能会出现这个错误,但是并不表示链接创建失败
                //如果fd是可写状态,则为链接是创建成功的.
                fprintf(stderr, "do_connect EINPROGRESS\n");
    
                //让event_loop去触发一个创建判断链接业务 用EPOLLOUT事件立刻触发
                _loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this);
            }
            else {
                fprintf(stderr, "connection error\n");
                exit(1);
            }
        }
    
    }
    

    6.3 有关非阻塞客户端socket创建链接问题

    ​ 这里转载一篇文章,是有关非阻塞套接字,connect返回-1,并且errno是EINPROGRESS的情况。因为我们的client是采用event_loop形式,socket需要被设置为非阻塞。所以需要针对这个情况做处理。下面是说明。

    ​ 客户端测试程序时,由于出现很多客户端,经过connect成功后,代码卡在recv系统调用中,后来发现可能是由于socket默认是阻塞模式,所以会令很多客户端链接处于链接却不能传输数据状态。

    ​ 后来修改socket为非阻塞模式,但在connect的时候,发现返回值为-1,刚开始以为是connect出现错误,但在服务器上看到了链接是ESTABLISED状态。证明链接是成功的

    ​ 但为什么会出现返回值是-1呢? 经过查询资料,以及看stevens的APUE,也发现有这么一说。

    ​ 当connect在非阻塞模式下,会出现返回-1值,错误码是EINPROGRESS,但如何判断connect是联通的呢?stevens书中说明要在connect后,继续判断该socket是否可写?

    若可写,则证明链接成功。

    ​ 如何判断可写,有2种方案,一种是select判断是否可写,二用poll模型。

    select:

    int CheckConnect(int iSocket)
    {
            fd_set rset;
    
            FD_ZERO(&rset);
            FD_SET(iSocket, &rset);
    
            timeval tm;
            tm. tv_sec = 0;
            tm.tv_usec = 0;
    
            if ( select(iSocket + 1, NULL, &rset, NULL, &tval) <= 0)
            {
                close(iSocket);
                return -1;
            }
    
            if (FD_ISSET(iSocket, &rset))
            {
                int err = -1;
                socklen_t len = sizeof(int);
                    if ( getsockopt(iSocket,  SOL_SOCKET, SO_ERROR ,&err, &len) < 0 )
                    {
                        close(iSocket);
                        printf("errno:%d %s\n", errno, strerror(errno));
                        return -2;
                    }
    
                    if (err)
                    {
                        errno = err;
                        close(iSocket);
    
                    return -3;
                    }
            }
    
            return 0;
    }
    

    poll:

    int CheckConnect(int iSocket) {
        struct pollfd fd;
        int ret = 0;
        socklen_t len = 0;
      fd.fd = iSocket;
        fd.events = POLLOUT;
    
        while ( poll (&fd, 1, -1) == -1 ) {
            if( errno != EINTR ){
                perror("poll");
                return -1;
            }
        }
    
        len = sizeof(ret);
        if ( getsockopt (iSocket, SOL_SOCKET, SO_ERROR, &ret, &len) == -1 ) {
                perror("getsockopt");
            return -1;
        }
    
        if(ret != 0) {
            fprintf (stderr, "socket %d connect failed: %s\n",
                 iSocket, strerror (ret));
            return -1;
        }
    
        return 0;
    }
    

    6.3 针对EINPROGRESS的连接创建处理

    ​ 看上面do_connect()的代码其中一部分:

      if(errno == EINPROGRESS) {
                //fd是非阻塞的,可能会出现这个错误,但是并不表示链接创建失败
                //如果fd是可写状态,则为链接是创建成功的.
                fprintf(stderr, "do_connect EINPROGRESS\n");
    
                //让event_loop去触发一个创建判断链接业务 用EPOLLOUT事件立刻触发
                _loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this);
            }
    

    这里是又触发一个写事件,直接让程序流程跳转到connection_delay()方法.那么我们需要在里面判断链接是否已经判断成功,并且做出一定的创建成功之后的业务动作。

    lars_reactor/src/tcp_client.cpp

    //判断链接是否是创建链接,主要是针对非阻塞socket 返回EINPROGRESS错误
    static void connection_delay(event_loop *loop, int fd, void *args)
    {
        tcp_client *cli = (tcp_client*)args;
        loop->del_io_event(fd);
    
        int result = 0;
        socklen_t result_len = sizeof(result);
        getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);
        if (result == 0) {
            //链接是建立成功的
            cli->connected = true;
    
            printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
    
            //建立连接成功之后,主动发送send_message
            const char *msg = "hello lars!";
            int msgid = 1;
            cli->send_message(msg, strlen(msg), msgid);
    
            loop->add_io_event(fd, read_callback, EPOLLIN, cli);
    
            if (cli->_obuf.length != 0) {
                //输出缓冲有数据可写
                loop->add_io_event(fd, write_callback, EPOLLOUT, cli);
            }
        }
        else {
            //链接创建失败
            fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
        }
    }
    

    ​ 这是一个事件回调,所以用的是static方法而不是成员方法。首先是利用getsockopt判断链接是否创建成功,如果成功,那么 我们当前这个版本的客户端是直接写死主动调用send_message()方法发送给服务端一个hello lars!字符串。然后直接交给我们的read_callback()方法处理,当然如果写缓冲有数据,我们也会触发写的write_callback()方法。

    ​ 接下来,看看这两个callback以及send_message是怎么实现的。

    callback

    lars_reactor/src/tcp_client.cpp

    static void write_callback(event_loop *loop, int fd, void *args)
    {
        tcp_client *cli = (tcp_client *)args;
        cli->do_write();
    }
    
    static void read_callback(event_loop *loop, int fd, void *args)
    {
        tcp_client *cli = (tcp_client *)args;
        cli->do_read();
    }
    
    //处理读业务
    int tcp_client::do_read()
    {
        //确定已经成功建立连接
        assert(connected == true);
        // 1. 一次性全部读取出来
        
        //得到缓冲区里有多少字节要被读取,然后将字节数放入b里面。   
        int need_read = 0;
        if (ioctl(_sockfd, FIONREAD, &need_read) == -1) {
            fprintf(stderr, "ioctl FIONREAD error");
            return -1;
        }
    
    
        //确保_buf可以容纳可读数据
        assert(need_read <= _ibuf.capacity - _ibuf.length);
    
        int ret;
    
        do {
            ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read);
        } while(ret == -1 && errno == EINTR);
    
        if (ret == 0) {
            //对端关闭
            if (_name != NULL) {
                printf("%s client: connection close by peer!\n", _name);
            }
            else {
                printf("client: connection close by peer!\n");
            }
    
            clean_conn();
            return -1;
        }
        else if (ret == -1) {
            fprintf(stderr, "client: do_read() , error\n");
            clean_conn();
            return -1;
        }
    
        
        assert(ret == need_read);
        _ibuf.length += ret;
    
        //2. 解包
        msg_head head;
        int msgid, length;
        while (_ibuf.length >= MESSAGE_HEAD_LEN) {
            memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN);
            msgid = head.msgid; 
            length = head.msglen;
    
            /*
            if (length + MESSAGE_HEAD_LEN < _ibuf.length) {
                break;
            }
            */
    
            //头部读取完毕
            _ibuf.pop(MESSAGE_HEAD_LEN);
    
            //3. 交给业务函数处理
            if (_msg_callback != NULL) {
                this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
            }
        
            //数据区域处理完毕
            _ibuf.pop(length);
        }
        
        //重置head指针
        _ibuf.adjust();
    
        return 0;
    }
    
    //处理写业务
    int tcp_client::do_write()
    {
        //数据有长度,切头部索引是起始位置
        assert(_obuf.head == 0 && _obuf.length);
    
        int ret;
    
        while (_obuf.length) {
            //写数据
            do {
                ret = write(_sockfd, _obuf.data, _obuf.length);
            } while(ret == -1 && errno == EINTR);//非阻塞异常继续重写
    
            if (ret > 0) {
               _obuf.pop(ret);
               _obuf.adjust();
            } 
            else if (ret == -1 && errno != EAGAIN) {
                fprintf(stderr, "tcp client write \n");
                this->clean_conn();
            }
            else {
                //出错,不能再继续写
                break;
            }
        }
    
        if (_obuf.length == 0) {
            //已经写完,删除写事件
            printf("do write over, del EPOLLOUT\n");
            this->_loop->del_io_event(_sockfd, EPOLLOUT);
        }
    
        return 0;
    }
    
    //释放链接资源,重置连接
    void tcp_client::clean_conn()
    {
        if (_sockfd != -1) {
            printf("clean conn, del socket!\n");
            _loop->del_io_event(_sockfd);
            close(_sockfd);
        }
    
        connected = false;
    
        //重新连接
        this->do_connect();
    }
    
    tcp_client::~tcp_client()
    {
        close(_sockfd);
    }
    

    ​ 这里是基本的读数据和写数据的处理业务实现。我们重点看do_read()方法,里面有段代码:

            //3. 交给业务函数处理
            if (_msg_callback != NULL) {
                this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
            }
    

    ​ 是将我们从服务端读取到的代码,交给了_msg_callback()方法来处理的,这个实际上是用户开发者自己在业务上注册的回调业务函数。在tcp_client.h中我们已经提供了set_msg_callback暴露给开发者注册使用。


    send_message

    lars_reactor/src/tcp_client.cpp

    //主动发送message方法
    int tcp_client::send_message(const char *data, int msglen, int msgid)
    {
        if (connected == false) {
            fprintf(stderr, "no connected , send message stop!\n");
            return -1;
        }
    
        //是否需要添加写事件触发
        //如果obuf中有数据,没必要添加,如果没有数据,添加完数据需要触发
        bool need_add_event = (_obuf.length == 0) ? true:false;
        if (msglen + MESSAGE_HEAD_LEN > this->_obuf.capacity - _obuf.length) {
            fprintf(stderr, "No more space to Write socket!\n");
            return -1;
        }
    
        //封装消息头
        msg_head head;
        head.msgid = msgid;
        head.msglen = msglen;
    
        memcpy(_obuf.data + _obuf.length, &head, MESSAGE_HEAD_LEN);
        _obuf.length += MESSAGE_HEAD_LEN;
    
        memcpy(_obuf.data + _obuf.length, data, msglen);
        _obuf.length += msglen;
    
        if (need_add_event) {
            _loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this);
        }
    
        return 0;
    }
    

    ​ 将发送的数据写给obuf,然后出发write_callback将obuf的数据传递给对方服务端。

    6.4 完成Lars Reactor V0.4开发

    ​ 好了,现在我们框架部分已经完成,接下来我们就要实现一个serverapp 和 一个clientapp来进行测试.

    我们创建example/lars_reactor_0.4文件夹。

    Makefile

    CXX=g++
    CFLAGS=-g -O2 -Wall -fPIC -Wno-deprecated 
    
    INC=-I../../include
    LIB=-L../../lib -llreactor -lpthread
    OBJS = $(addsuffix .o, $(basename $(wildcard *.cc)))
    
    all:
            $(CXX) -o server $(CFLAGS)  server.cpp $(INC) $(LIB)
            $(CXX) -o client $(CFLAGS)  client.cpp $(INC) $(LIB)
    
    clean:
            -rm -f *.o server client
    

    服务端代码:

    server.cpp

    #include "tcp_server.h"
    
    int main() 
    {
        event_loop loop;
    
        tcp_server server(&loop, "127.0.0.1", 7777);
    
        loop.event_process();
    
        return 0;
    }
    

    客户端代码:

    client.cpp

    #include "tcp_client.h"
    #include <stdio.h>
    #include <string.h>
    
    //客户端业务
    void busi(const char *data, uint32_t len, int msgid, tcp_client *conn, void *user_data)
    {
        //得到服务端回执的数据  
        printf("recv server: [%s]\n", data);
        printf("msgid: [%d]\n", msgid);
        printf("len: [%d]\n", len);
    }
    
    int main() 
    {
        event_loop loop;
    
        //创建tcp客户端
        tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.4");
    
        //注册回调业务
        client.set_msg_callback(busi);
    
        //开启事件监听
        loop.event_process();
    
        return 0;
    }
    

    编译并分别启动server 和client

    服务端输出:

    $ ./server 
    begin accept
    get new connection succ!
    read data: hello lars!
    server send_message: hello lars!:11, msgid = 1
    

    客户端输出:

    $ ./client 
    do_connect EINPROGRESS
    connect 127.0.0.1:7777 succ!
    do write over, del EPOLLOUT
    recv server: [hello lars!]
    msgid: [1]
    len: [11]
    

    ​ 现在客户端已经成功的发送数据给服务端,并且回显的数据也直接被客户端解析,我们的框架到现在就可以做一个基本的客户端和服务端的完成了,但是还差很多,接下来我们继续优化。


    关于作者:

    作者:Aceld(刘丹冰)

    mail: danbing.at@gmail.com
    github: https://github.com/aceld
    原创书籍gitbook: http://legacy.gitbook.com/@aceld

    原创声明:未经作者允许请勿转载, 如果转载请注明出处

    相关文章

      网友评论

          本文标题:(7)客户端模型(Reactor部分)【Lars-基于C++负载

          本文链接:https://www.haomeiwen.com/subject/smohuctx.html