美文网首页
(13)udp服务与客户端(Reactor部分)-【Lars-基

(13)udp服务与客户端(Reactor部分)-【Lars-基

作者: 刘丹冰Aceld | 来源:发表于2019-10-07 13:01 被阅读0次

    【Lars教程目录】

    Lars源代码
    https://github.com/aceld/Lars


    【Lars系统概述】
    第1章-概述
    第2章-项目目录构建


    【Lars系统之Reactor模型服务器框架模块】
    第1章-项目结构与V0.1雏形
    第2章-内存管理与Buffer封装
    第3章-事件触发EventLoop
    第4章-链接与消息封装
    第5章-Client客户端模型
    第6章-连接管理及限制
    第7章-消息业务路由分发机制
    第8章-链接创建/销毁Hook机制
    第9章-消息任务队列与线程池
    第10章-配置文件读写功能
    第11章-udp服务与客户端
    第12章-数据传输协议protocol buffer
    第13章-QPS性能测试
    第14章-异步消息任务机制
    第15章-链接属性设置功能


    【Lars系统之DNSService模块】
    第1章-Lars-dns简介
    第2章-数据库创建
    第3章-项目目录结构及环境构建
    第4章-Route结构的定义
    第5章-获取Route信息
    第6章-Route订阅模式
    第7章-Backend Thread实时监控


    【Lars系统之Report Service模块】
    第1章-项目概述-数据表及proto3协议定义
    第2章-获取report上报数据
    第3章-存储线程池及消息队列


    【Lars系统之LoadBalance Agent模块】
    第1章-项目概述及构建
    第2章-主模块业务结构搭建
    第3章-Report与Dns Client设计与实现
    第4章-负载均衡模块基础设计
    第5章-负载均衡获取Host主机信息API
    第6章-负载均衡上报Host主机信息API
    第7章-过期窗口清理与过载超时(V0.5)
    第8章-定期拉取最新路由信息(V0.6)
    第9章-负载均衡获取Route信息API(0.7)
    第10章-API初始化接口(V0.8)
    第11章-Lars Agent性能测试工具
    第12章- Lars启动工具脚本


    12) udp服务与客户端

    ​ 接下来为了让Reactor框架功能更加丰富,结合之前的功能,再加上udpserver的服务接口。udp我们暂时不考虑加线程池实现,只是单线程的处理方式。

    12.1 udp_server服务端功能实现

    lars_reactor/include/udp_server.h

    #pragma  once
    
    #include <netinet/in.h>
    #include "event_loop.h"
    #include "net_connection.h"
    #include "message.h"
    
    class udp_server :public net_connection 
    {
    public:
        udp_server(event_loop *loop, const char *ip, uint16_t port);
    
        virtual int send_message(const char *data, int msglen, int msgid);
    
        //注册消息路由回调函数
        void add_msg_router(int msgid, msg_callback* cb, void *user_data = NULL);
    
        ~udp_server();
    
        //处理消息业务
        void do_read();
        
    private:
        int _sockfd;
    
        char _read_buf[MESSAGE_LENGTH_LIMIT];
        char _write_buf[MESSAGE_LENGTH_LIMIT];
    
        //事件触发
        event_loop* _loop;
    
        //服务端ip
        struct sockaddr_in _client_addr;
        socklen_t _client_addrlen;
        
        //消息路由分发
        msg_router _router;
    };
    

    ​ 对应的方法实现方式如下:

    lars_reactor/src/udp_server.cpp

    #include <signal.h>
    #include <unistd.h>
    #include <strings.h>
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <stdio.h>
    #include <string.h>
    #include "udp_server.h"
    
    
    void read_callback(event_loop *loop, int fd, void *args)
    {
        udp_server *server = (udp_server*)args;
    
        //处理业务函数
        server->do_read();
    }
    
    void udp_server::do_read()
    {
        while (true) {
            int pkg_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, (struct sockaddr *)&_client_addr, &_client_addrlen);
    
            if (pkg_len == -1) {
                if (errno == EINTR) {
                    continue;
                }
                else if (errno == EAGAIN) {
                    break;
                }
                else {
                    perror("recvfrom\n");
                    break;
                }
            }
    
            //处理数据
            msg_head head; 
            memcpy(&head, _read_buf, MESSAGE_HEAD_LEN);
            if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkg_len) {
                //报文格式有问题
                fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkg_len = %d\n", head.msgid, head.msglen, pkg_len);
                continue;
            }
    
            //调用注册的路由业务
            _router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this);
        }
    }
    
    
    udp_server::udp_server(event_loop *loop, const char *ip, uint16_t port)
    {
        //1 忽略一些信号
        if (signal(SIGHUP, SIG_IGN) == SIG_ERR) {
            perror("signal ignore SIGHUB");
            exit(1);
        }
        if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) {
            perror("signal ignore SIGPIPE");
            exit(1);
        }
        
        //2 创建套接字
        //SOCK_CLOEXEC在execl中使用该socket则关闭,在fork中使用该socket不关闭
        _sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);
        if (_sockfd == -1) {
            perror("create udp socket");
            exit(1);
        }
    
        //3 设置服务ip+port
        struct sockaddr_in servaddr;
        bzero(&servaddr, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        inet_aton(ip, &servaddr.sin_addr);//设置ip
        servaddr.sin_port = htons(port);//设置端口
    
        //4 绑定
        bind(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));
        
        //3 添加读业务事件
        _loop = loop;
    
        bzero(&_client_addr, sizeof(_client_addr));
        _client_addrlen = sizeof(_client_addr);
        
    
        printf("server on %s:%u is running...\n", ip, port);
    
        _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
        
    }
    
    int udp_server::send_message(const char *data, int msglen, int msgid)
    {
        if (msglen > MESSAGE_LENGTH_LIMIT) {
            fprintf(stderr, "too large message to send\n");
            return -1;
        }
    
        msg_head head;
        head.msglen = msglen;
        head.msgid = msgid;
    
        memcpy(_write_buf,  &head, MESSAGE_HEAD_LEN);
        memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen);
    
        int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, (struct sockaddr*)&_client_addr, _client_addrlen);
        if (ret == -1) {
            perror("sendto()..");
            return -1;
        }
    
        return ret;
    }
    
    //注册消息路由回调函数
    void udp_server::add_msg_router(int msgid, msg_callback* cb, void *user_data)
    {
        _router.register_msg_router(msgid, cb, user_data);
    }
    
    udp_server::~udp_server()
    {
        _loop->del_io_event(_sockfd);
        close(_sockfd);
    }
    

    ​ 这里面实现的方式和tcp_server的实现方式几乎一样,需要注意的是,udp的socket编程是不需要listen的,而且也不需要accept。所以recvfrom就能够得知每个包的对应客户端是谁,然后回执消息给对应的客户端就可以。因为没有连接,所以都是以包为单位来处理的,一个包一个包处理。可能相邻的两个包来自不同的客户端。

    12.2 udp_client客户端功能实现

    lars_reactor/include/udp_client.h

    #pragma once
    
    #include "net_connection.h"
    #include "message.h"
    #include "event_loop.h"
    
    class udp_client: public net_connection
    {
    public:
        udp_client(event_loop *loop, const char *ip, uint16_t port);
        ~udp_client();
    
        void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL);
    
        virtual int send_message(const char *data, int msglen, int msgid);
    
        //处理消息
        void do_read();
    
    private:
    
        int _sockfd;
    
        char _read_buf[MESSAGE_LENGTH_LIMIT];
        char _write_buf[MESSAGE_LENGTH_LIMIT];
    
        //事件触发
        event_loop *_loop;
    
        //消息路由分发
        msg_router _router;
    };
    

    lars_reactor/src/udp_client.cpp

    #include "udp_client.h"
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <arpa/inet.h>
    #include <unistd.h>
    #include <strings.h>
    #include <string.h>
    #include <stdio.h>
    
    
    void read_callback(event_loop *loop, int fd, void *args)
    {
        udp_client *client = (udp_client*)args;
        client->do_read();
    }
    
    
    
    udp_client::udp_client(event_loop *loop, const char *ip, uint16_t port)
    {
        //1 创建套接字
        _sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP);
        if (_sockfd == -1) {
            perror("create socket error");
            exit(1);
        }
    
        struct sockaddr_in servaddr;
        bzero(&servaddr, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        inet_aton(ip, &servaddr.sin_addr);
        servaddr.sin_port = htons(port);
    
        //2 链接
        int ret = connect(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));
        if (ret  == -1) {
            perror("connect");
            exit(1);
        }
    
    
        //3 添加读事件
        _loop = loop; 
        _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
    }
    
    udp_client::~udp_client()
    {
        _loop->del_io_event(_sockfd);
        close(_sockfd);
    }
    
    //处理消息
    void udp_client::do_read()
    {
        while (true) {
            int pkt_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, NULL, NULL);
            if (pkt_len == -1) {
                if (errno == EINTR) {
                    continue;
                }
                else if (errno == EAGAIN) {
                    break;
                }
                else {
                    perror("recvfrom()");
                    break;
                }
            }
    
            //处理客户端包
            msg_head head; 
            memcpy(&head, _read_buf, MESSAGE_HEAD_LEN);
            if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkt_len) {
                //报文格式有问题
                fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkt_len = %d\n", head.msgid, head.msglen, pkt_len);
                continue;
            }
    
            //调用注册的路由业务
            _router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this);
        }
    }
        
    void udp_client::add_msg_router(int msgid, msg_callback *cb, void *user_data)
    {
        _router.register_msg_router(msgid, cb, user_data);
    }
    
    int udp_client::send_message(const char *data, int msglen, int msgid)
    {
        if (msglen > MESSAGE_LENGTH_LIMIT) {
            fprintf(stderr, "too large message to send\n");
            return -1;
        }
    
        msg_head head;
        head.msglen = msglen;
        head.msgid = msgid;
    
        memcpy(_write_buf,  &head, MESSAGE_HEAD_LEN);
        memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen);
    
        int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);
        if (ret == -1) {
            perror("sendto()..");
            return -1;
        }
    
        return ret;
    }
    

    ​ 客户端和服务端代码除了构造函数不同,其他基本差不多。接下来我们可以测试一下udp的通信功能

    12.3 完成Lars Reactor V0.10开发

    服务端

    server.cpp

    #include <string>
    #include <string.h>
    #include "config_file.h"
    #include "udp_server.h"
    
    //回显业务的回调函数
    void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
    {
        printf("callback_busi ...\n");
        //直接回显
        conn->send_message(data, len, msgid);
    }
    
    int main() 
    {
        event_loop loop;
    
        //加载配置文件
        config_file::setPath("./serv.conf");
        std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
        short port = config_file::instance()->GetNumber("reactor", "port", 8888);
    
        printf("ip = %s, port = %d\n", ip.c_str(), port);
    
        udp_server server(&loop, ip.c_str(), port);
    
        //注册消息业务路由
        server.add_msg_router(1, callback_busi);
    
        loop.event_process();
    
        return 0;
    }
    

    客户端

    client.cpp

    #include <stdio.h>
    #include <string.h>
    
    #include "udp_client.h"
    
    
    //客户端业务
    void busi(const char *data, uint32_t len, int msgid, net_connection  *conn, void *user_data)
    {
        //得到服务端回执的数据 
        char *str = NULL;
        
        str = (char*)malloc(len+1);
        memset(str, 0, len+1);
        memcpy(str, data, len);
        printf("recv server: [%s]\n", str);
        printf("msgid: [%d]\n", msgid);
        printf("len: [%d]\n", len);
    }
    
    
    int main() 
    {
        event_loop loop;
    
        //创建udp客户端
        udp_client client(&loop, "127.0.0.1", 7777);
    
    
        //注册消息路由业务
        client.add_msg_router(1, busi);
    
        //发消息
        int msgid = 1; 
        const char *msg = "Hello Lars!";
    
        client.send_message(msg, strlen(msg), msgid);
    
        //开启事件监听
        loop.event_process();
    
        return 0;
    }
    

    启动服务端和客户端并允许,结果如下:

    server

    $ ./server 
    ip = 127.0.0.1, port = 7777
    msg_router init...
    server on 127.0.0.1:7777 is running...
    add msg cb msgid = 1
    call msgid = 1
    call data = Hello Lars!
    call msglen = 11
    callback_busi ...
    =======
    

    client

    $ ./client 
    msg_router init...
    add msg cb msgid = 1
    call msgid = 1
    call data = Hello Lars!
    call msglen = 11
    recv server: [Hello Lars!]
    msgid: [1]
    len: [11]
    =======
    

    关于作者:

    作者:Aceld(刘丹冰)

    mail: danbing.at@gmail.com
    github: https://github.com/aceld
    原创书籍gitbook: http://legacy.gitbook.com/@aceld

    原创声明:未经作者允许请勿转载, 如果转载请注明出处

    相关文章

      网友评论

          本文标题:(13)udp服务与客户端(Reactor部分)-【Lars-基

          本文链接:https://www.haomeiwen.com/subject/qqazuctx.html