美文网首页
(27)存储线程池及消息队列(Reporter部分)-【Lars

(27)存储线程池及消息队列(Reporter部分)-【Lars

作者: 刘丹冰Aceld | 来源:发表于2019-10-23 09:34 被阅读0次

    【Lars教程目录】

    Lars源代码
    https://github.com/aceld/Lars


    【Lars系统概述】
    第1章-概述
    第2章-项目目录构建


    【Lars系统之Reactor模型服务器框架模块】
    第1章-项目结构与V0.1雏形
    第2章-内存管理与Buffer封装
    第3章-事件触发EventLoop
    第4章-链接与消息封装
    第5章-Client客户端模型
    第6章-连接管理及限制
    第7章-消息业务路由分发机制
    第8章-链接创建/销毁Hook机制
    第9章-消息任务队列与线程池
    第10章-配置文件读写功能
    第11章-udp服务与客户端
    第12章-数据传输协议protocol buffer
    第13章-QPS性能测试
    第14章-异步消息任务机制
    第15章-链接属性设置功能


    【Lars系统之DNSService模块】
    第1章-Lars-dns简介
    第2章-数据库创建
    第3章-项目目录结构及环境构建
    第4章-Route结构的定义
    第5章-获取Route信息
    第6章-Route订阅模式
    第7章-Backend Thread实时监控


    【Lars系统之Report Service模块】
    第1章-项目概述-数据表及proto3协议定义
    第2章-获取report上报数据
    第3章-存储线程池及消息队列


    【Lars系统之LoadBalance Agent模块】
    第1章-项目概述及构建
    第2章-主模块业务结构搭建
    第3章-Report与Dns Client设计与实现
    第4章-负载均衡模块基础设计
    第5章-负载均衡获取Host主机信息API
    第6章-负载均衡上报Host主机信息API
    第7章-过期窗口清理与过载超时(V0.5)
    第8章-定期拉取最新路由信息(V0.6)
    第9章-负载均衡获取Route信息API(0.7)
    第10章-API初始化接口(V0.8)
    第11章-Lars Agent性能测试工具
    第12章- Lars启动工具脚本


    5) 存储线程池及消息队列

    ​ 我们现在的reporter_service的io入库操作,完全是在消息的callback中进行的,那么实际上,这回占用我们server的工作线程的阻塞时间,从而浪费cpu。所以我们应该将io的入库操作,交给一个专门做入库的消息队列线程池来做,这样我们的callback就会立刻返回该业务,从而可以继续处理下一个conn链接的消息事件业务。

    ​ 所以我们就要在此给reporter_service设计一个存储数据的线程池及配套的消息队列。当然这里面我们还是直接用写好的lars_reactor框架里的接口即可。

    lars_reporter/src/reporter_service.cpp

    #include "lars_reactor.h"
    #include "lars.pb.h"
    #include "store_report.h"
    #include <string>
    
    thread_queue<lars::ReportStatusRequest> **reportQueues = NULL;
    int thread_cnt = 0;
    
    void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
    {
        lars::ReportStatusRequest req;
    
        req.ParseFromArray(data, len);
    
        //将上报数据存储到db 
        StoreReport sr;
        sr.store(req);
    
        //轮询将消息平均发送到每个线程的消息队列中
        static int index = 0;
        //将消息发送给某个线程消息队列
        reportQueues[index]->send(req);
        index ++;
        index = index % thread_cnt;
    }
    
    void create_reportdb_threads()
    {
        thread_cnt = config_file::instance()->GetNumber("reporter", "db_thread_cnt", 3);
        
        //开线程池的消息队列
        reportQueues = new thread_queue<lars::ReportStatusRequest>*[thread_cnt];
    
        if (reportQueues == NULL) {
            fprintf(stderr, "create thread_queue<lars::ReportStatusRequest>*[%d], error", thread_cnt) ;
            exit(1);
        }
    
        for (int i = 0; i < thread_cnt; i++) {
            //给当前线程创建一个消息队列queue
            reportQueues[i] = new thread_queue<lars::ReportStatusRequest>();
            if (reportQueues == NULL) {
                fprintf(stderr, "create thread_queue error\n");
                exit(1);
            }
    
            pthread_t tid;
            int ret = pthread_create(&tid, NULL, store_main, reportQueues[i]);
            if (ret == -1)  {
                perror("pthread_create");
                exit(1);
            }
    
            pthread_detach(tid);
        }
    }
    
    
    int main(int argc, char **argv)
    {
        event_loop loop;
    
        //加载配置文件
        config_file::setPath("./conf/lars_reporter.conf");
        std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
        short port = config_file::instance()->GetNumber("reactor", "port", 7779);
    
    
        //创建tcp server
        tcp_server server(&loop, ip.c_str(), port);
    
        //添加数据上报请求处理的消息分发处理业务
        server.add_msg_router(lars::ID_ReportStatusRequest, get_report_status);
    
        //为了防止在业务中出现io阻塞,那么需要启动一个线程池对IO进行操作的,接受业务的请求存储消息
        create_reportdb_threads();
      
        //启动事件监听
        loop.event_process(); 
    
        return 0;
    }
    

    ​ 这里主线程启动了线程池,根据配置文件的db_thread_cnt数量来开辟。每个线程都会执行store_main方法,我们来看一下实现

    lars_reporter/src/store_thread.cpp

    #include "lars.pb.h"
    #include "lars_reactor.h"
    #include "store_report.h"
    
    struct Args 
    {
        thread_queue<lars::ReportStatusRequest>* first;
        StoreReport *second;
    };
    
    //typedef void io_callback(event_loop *loop, int fd, void *args);
    void thread_report(event_loop *loop, int fd, void *args)
    {
        //1. 从queue里面取出需要report的数据(需要thread_queue)
        thread_queue<lars::ReportStatusRequest>* queue = ((Args*)args)->first;
        StoreReport *sr = ((Args*)args)->second;
    
        std::queue<lars::ReportStatusRequest> report_msgs;
    
        //1.1 从消息队列中取出全部的消息元素集合
        queue->recv(report_msgs);
        while ( !report_msgs.empty() ) {
            lars::ReportStatusRequest msg = report_msgs.front();
            report_msgs.pop();
    
            //2. 将数据存储到DB中(需要StoreReport)
            sr->store(msg);
        }
    }
    
    
    void *store_main(void *args)
    {
        //得到对应的thread_queue
        thread_queue<lars::ReportStatusRequest> *queue = (thread_queue<lars::ReportStatusRequest>*)args;
    
        //定义事件触发机制
        event_loop loop;
    
        //定义一个存储对象
        StoreReport sr; 
    
        Args callback_args;
        callback_args.first = queue;
        callback_args.second = &sr;
    
        queue->set_loop(&loop);
        queue->set_callback(thread_report, &callback_args);
    
    
        //启动事件监听
        loop.event_process();
    
        return NULL;
    }
    

    ​ 每个线程都会绑定一个thread_queue<lars::ReportStatusRequest>,然后一个线程里面有一个loop,来监控消息队列是否有消息事件过来,如果有消息实现过来,针对每个消息会触发thread_report()方法, 在thread_report()中,我们就直接将lars::ReportStatusRequest消息存储到db中。

    ​ 那么,由谁来给每个线程的thread_queue发送消息呢,就是agent/客户端发送的请求,我们在处理lars::ID_ReportStatusRequest 消息分发业务的时候调用get_report_status()来触发。

    lars_reporter/src/reporter_service.cpp

    void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
    {
        lars::ReportStatusRequest req;
    
        req.ParseFromArray(data, len);
    
        //将上报数据存储到db 
        StoreReport sr;
        sr.store(req);
    
        //轮询将消息平均发送到每个线程的消息队列中
        static int index = 0;
        //将消息发送给某个线程消息队列
        reportQueues[index]->send(req);
        index ++;
        index = index % thread_cnt;
    }
    

    ​ 这里的分发机制,是采用最轮询的方式,是每个线程依次分配,去调用thread_queuesend()方法,将消息发送给消息队列。

    ​ 最后我们进行测试,效果跟之前的效果是一样的。我们现在已经集成进来了存储线程池,现在就不用担心在处理业务的时候,因为DB等的io阻塞,使cpu得不到充分利用了。


    关于作者:

    作者:Aceld(刘丹冰)

    mail: danbing.at@gmail.com
    github: https://github.com/aceld
    原创书籍gitbook: http://legacy.gitbook.com/@aceld

    原创声明:未经作者允许请勿转载, 如果转载请注明出处

    相关文章

      网友评论

          本文标题:(27)存储线程池及消息队列(Reporter部分)-【Lars

          本文链接:https://www.haomeiwen.com/subject/jbehvctx.html