美文网首页
(36)负载均衡获取Route信息API(0.7)(LoadBa

(36)负载均衡获取Route信息API(0.7)(LoadBa

作者: 刘丹冰Aceld | 来源:发表于2019-11-29 14:00 被阅读0次

    【Lars教程目录】

    Lars源代码
    https://github.com/aceld/Lars


    【Lars系统概述】
    第1章-概述
    第2章-项目目录构建


    【Lars系统之Reactor模型服务器框架模块】
    第1章-项目结构与V0.1雏形
    第2章-内存管理与Buffer封装
    第3章-事件触发EventLoop
    第4章-链接与消息封装
    第5章-Client客户端模型
    第6章-连接管理及限制
    第7章-消息业务路由分发机制
    第8章-链接创建/销毁Hook机制
    第9章-消息任务队列与线程池
    第10章-配置文件读写功能
    第11章-udp服务与客户端
    第12章-数据传输协议protocol buffer
    第13章-QPS性能测试
    第14章-异步消息任务机制
    第15章-链接属性设置功能


    【Lars系统之DNSService模块】
    第1章-Lars-dns简介
    第2章-数据库创建
    第3章-项目目录结构及环境构建
    第4章-Route结构的定义
    第5章-获取Route信息
    第6章-Route订阅模式
    第7章-Backend Thread实时监控


    【Lars系统之Report Service模块】
    第1章-项目概述-数据表及proto3协议定义
    第2章-获取report上报数据
    第3章-存储线程池及消息队列


    【Lars系统之LoadBalance Agent模块】
    第1章-项目概述及构建
    第2章-主模块业务结构搭建
    第3章-Report与Dns Client设计与实现
    第4章-负载均衡模块基础设计
    第5章-负载均衡获取Host主机信息API
    第6章-负载均衡上报Host主机信息API
    第7章-过期窗口清理与过载超时(V0.5)
    第8章-定期拉取最新路由信息(V0.6)
    第9章-负载均衡获取Route信息API(0.7)
    第10章-API初始化接口(V0.8)
    第11章-Lars Agent性能测试工具
    第12章- Lars启动工具脚本


    10) 负载均衡获取Route信息API(0.7)

    10.1 proto通信协议定义

    base/proto/lars.proto

    /* Lars系统的消息ID */
    enum MessageId {
        ID_UNKNOW                = 0;  //proto3 enum第一个属性必须是0,用来占位
        ID_GetRouteRequest       = 1;  //向DNS请求Route对应的关系的消息ID
        ID_GetRouteResponse      = 2;  //DNS回复的Route信息的消息ID
        ID_ReportStatusRequest   = 3;  //上报host调用状态信息请求消息ID
        ID_GetHostRequest        = 4;  //API 发送请求host信息给 Lb Agent模块 消息ID
        ID_GetHostResponse       = 5;  //agent 回执给 API host信息的 消息ID
        ID_ReportRequest         = 6;  //API report get_host的调用结果给agent的 消息ID
    // =======================================================
        ID_API_GetRouteRequest   = 7;  //API 请求agent某个modid/cmdid的全部hosts信息的route 消息ID
        ID_API_GetRouteResponse  = 8;  //agent 回执给 API的全部hosts的route信息 消息ID
    // =======================================================
    }
    
    

    ​ 增加两个message ID, ID_API_GetRouteRequestID_API_GetRouteResponse,主要是针对API层获取route全部的host节点信息通信使用。

    10.2 Lars-API:get_route()方法客户端实现

    api/cpp/lars_api/lars_api.h

    typedef std::pair<std::string, int> ip_port;
    typedef std::vector<ip_port> route_set;
    typedef route_set::iterator route_set_it;
    

    api/cpp/lars_api/lars_api.cpp

    //lars 系统获取某modid/cmdid全部的hosts(route)信息
    int lars_client::get_route(int modid, int cmdid, route_set &route)
    {
        //1. 封装请求消息
        lars::GetRouteRequest req;
        req.set_modid(modid);
        req.set_cmdid(cmdid);
    
        //2. send
        char write_buf[4096], read_buf[80*1024];
        //消息头
        msg_head head;
        head.msglen = req.ByteSizeLong();
        head.msgid = lars::ID_API_GetRouteRequest;
        memcpy(write_buf, &head, MESSAGE_HEAD_LEN);
        
        //消息体
        req.SerializeToArray(write_buf+MESSAGE_HEAD_LEN, head.msglen);
    
        //简单的hash来发给对应的agent udp server
        int index = (modid + cmdid) %3;
        int ret = sendto(_sockfd[index], write_buf, head.msglen + MESSAGE_HEAD_LEN, 0, NULL, 0);
        if (ret == -1) {
            perror("sendto");
            return lars::RET_SYSTEM_ERROR;
        }
        
        //3. recv
        lars::GetRouteResponse rsp;
    
        int message_len = recvfrom(_sockfd[index], read_buf, sizeof(read_buf), 0, NULL, NULL);
        if (message_len == -1) {
            perror("recvfrom");
            return lars::RET_SYSTEM_ERROR;
        }
    
        //消息头
        memcpy(&head, read_buf, MESSAGE_HEAD_LEN);
        if (head.msgid != lars::ID_API_GetRouteResponse) {
            fprintf(stderr, "message ID error!\n");
            return lars::RET_SYSTEM_ERROR;
        }
    
        //消息体 
        ret = rsp.ParseFromArray(read_buf + MESSAGE_HEAD_LEN, message_len - MESSAGE_HEAD_LEN);
        if (!ret) {
            fprintf(stderr, "message format error: head.msglen = %d, message_len = %d, message_len - MESSAGE_HEAD_LEN = %d, head msgid = %d, ID_GetHostResponse = %d\n", head.msglen, message_len, message_len-MESSAGE_HEAD_LEN, head.msgid, lars::ID_GetRouteResponse);
            return lars::RET_SYSTEM_ERROR;
        }
    
        if (rsp.modid() != modid || rsp.cmdid() != cmdid) {
            fprintf(stderr, "message format error\n");
            return lars::RET_SYSTEM_ERROR;
        }
    
        //4 处理消息
        for (int i = 0; i < rsp.host_size(); i++) {
            const lars::HostInfo &host = rsp.host(i);
            struct in_addr inaddr;
            inaddr.s_addr = host.ip();
            std::string ip = inet_ntoa(inaddr);
            int port = host.port();
            route.push_back(ip_port(ip,port));
        }
    
        return lars::RET_SUCC;
    }
    
    

    10.3 Agent UDP Server处理API-get_route请求

    lars_loadbalance_agent/src/agent_udp_server.cpp

    static void get_route_cb(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data)
    {
    
        //解析api发送的请求包
        lars::GetRouteRequest req;         
    
        req.ParseFromArray(data, len);
        int modid = req.modid();
        int cmdid = req.cmdid();
    
        //设置回执消息
        lars::GetRouteResponse rsp;
        rsp.set_modid(modid);
        rsp.set_cmdid(cmdid);
    
        route_lb *ptr_route_lb = (route_lb*)user_data;
        
        //调用route_lb的获取host方法,得到rsp返回结果
        ptr_route_lb->get_route(modid, cmdid, rsp);
    
        //打包回执给api消息
        std::string responseString; 
        rsp.SerializeToString(&responseString);
        net_conn->send_message(responseString.c_str(), responseString.size(), lars::ID_API_GetRouteResponse);
        
    }
    
    
    void * agent_server_main(void * args)
    {
    
        // ....
    
        //给server注册消息分发路由业务,针对ID_API_GetRouteRequest处理
        server.add_msg_router(lars::ID_API_GetRouteRequest, get_route_cb, r_lb[port-8888]);
    
    
        // ...
    
        return NULL;
    }
    

    针对ID_API_GetRouteRequest添加一个消息处理方法,在回调业务中,通过route_lbget_route()方法获取信息.我们来实现这个方法。

    lars_loadbalance_agent/src/route_lb.cpp

    //agent获取某个modid/cmdid的全部主机,将返回的主机结果存放在rsp中
    int route_lb::get_route(int modid, int cmdid, lars::GetRouteResponse &rsp)
    {
        int ret = lars::RET_SUCC;
    
        //1. 得到key
        uint64_t key = ((uint64_t)modid << 32) + cmdid;
    
        pthread_mutex_lock(&_mutex);
        //2. 当前key已经存在_route_lb_map中
        if (_route_lb_map.find(key) != _route_lb_map.end()) {
            //2.1 取出对应的load_balance
            load_balance *lb = _route_lb_map[key];
    
            std::vector<host_info*> vec;
            lb->get_all_hosts(vec);
    
            for (std::vector<host_info*>::iterator it = vec.begin(); it != vec.end(); it++) {
                lars::HostInfo host;
                host.set_ip((*it)->ip);
                host.set_port((*it)->port);
                rsp.add_host()->CopyFrom(host);
            }
    
            //超时重拉路由
            //检查是否要重新拉路由信息
            //若路由并没有处于PULLING状态,且有效期已经超时,则重新拉取
            if (lb->status == load_balance::NEW && time(NULL) - lb->last_update_time > lb_config.update_timeout) {
                lb->pull();
            }
        }
        //3. 当前key不存在_route_lb_map中
        else {
            //3.1 新建一个load_balance
            load_balance *lb = new load_balance(modid, cmdid);
            if (lb == NULL) {
                fprintf(stderr, "no more space to create loadbalance\n");
                exit(1);
            }
    
            //3.2 新建的load_balance加入到map中
            _route_lb_map[key] = lb;
    
            //3.3 从dns service服务拉取具体的host信息
            lb->pull();
    
            ret = lars::RET_NOEXIST;
        }
        pthread_mutex_unlock(&_mutex);
    
        return ret;
    }
    

    其中,load_balanceget_all_hosts()方法实现如下:

    lars_loadbalance_agent/src/load_balance.cpp

    //获取当前挂载下的全部host信息 添加到vec中
    void load_balance::get_all_hosts(std::vector<host_info*> &vec)
    {
        for (host_map_it it = _host_map.begin(); it != _host_map.end(); it++) {
            host_info *hi = it->second;
            vec.push_back(hi);
        }
    }
    

    接下来,我们可以简单测试一些获取route信息的api

    api/cpp/example/example.cpp

    #include "lars_api.h"
    #include <iostream>
    
    
    void usage()
    {
        printf("usage: ./example [modid] [cmdid]\n");
    }
    
    int main(int argc, char **argv)
    {
        if (argc != 3) {
            usage();
            return 1;
        }
    
        int modid = atoi(argv[1]);
        int cmdid = atoi(argv[2]);
        lars_client api;
    
        std::string ip; 
        int port;
    
        route_set route;
        int ret = api.get_route(modid, cmdid, route);
        if (ret == 0) {
            std::cout << "get route succ!" << std::endl;
            for (route_set_it it = route.begin(); it != route.end(); it++) {
                std::cout << "ip = " << (*it).first << ", port = " << (*it).second << std::endl;
            }
        }
    
        ret = api.get_host(modid, cmdid, ip, port);
        if (ret == 0) {
            std::cout << "host is " << ip << ":" << port << std::endl;
    
            //上报调用结果
            api.report(modid, cmdid, ip, port, 0);
        }
    
        return 0;
    }
    

    关于作者:

    作者:Aceld(刘丹冰)

    mail: danbing.at@gmail.com
    github: https://github.com/aceld
    原创书籍gitbook: http://legacy.gitbook.com/@aceld

    原创声明:未经作者允许请勿转载, 如果转载请注明出处

    相关文章

      网友评论

          本文标题:(36)负载均衡获取Route信息API(0.7)(LoadBa

          本文链接:https://www.haomeiwen.com/subject/xwxgwctx.html