美文网首页
(11)消息任务队列与线程池(Reactor部分)-【Lars-

(11)消息任务队列与线程池(Reactor部分)-【Lars-

作者: 刘丹冰Aceld | 来源:发表于2019-09-26 09:30 被阅读0次

    【Lars教程目录】

    Lars源代码
    https://github.com/aceld/Lars


    【Lars系统概述】
    第1章-概述
    第2章-项目目录构建


    【Lars系统之Reactor模型服务器框架模块】
    第1章-项目结构与V0.1雏形
    第2章-内存管理与Buffer封装
    第3章-事件触发EventLoop
    第4章-链接与消息封装
    第5章-Client客户端模型
    第6章-连接管理及限制
    第7章-消息业务路由分发机制
    第8章-链接创建/销毁Hook机制
    第9章-消息任务队列与线程池
    第10章-配置文件读写功能
    第11章-udp服务与客户端
    第12章-数据传输协议protocol buffer
    第13章-QPS性能测试
    第14章-异步消息任务机制
    第15章-链接属性设置功能


    【Lars系统之DNSService模块】
    第1章-Lars-dns简介
    第2章-数据库创建
    第3章-项目目录结构及环境构建
    第4章-Route结构的定义
    第5章-获取Route信息
    第6章-Route订阅模式
    第7章-Backend Thread实时监控


    【Lars系统之Report Service模块】
    第1章-项目概述-数据表及proto3协议定义
    第2章-获取report上报数据
    第3章-存储线程池及消息队列


    【Lars系统之LoadBalance Agent模块】
    第1章-项目概述及构建
    第2章-主模块业务结构搭建
    第3章-Report与Dns Client设计与实现
    第4章-负载均衡模块基础设计
    第5章-负载均衡获取Host主机信息API
    第6章-负载均衡上报Host主机信息API
    第7章-过期窗口清理与过载超时(V0.5)
    第8章-定期拉取最新路由信息(V0.6)
    第9章-负载均衡获取Route信息API(0.7)
    第10章-API初始化接口(V0.8)
    第11章-Lars Agent性能测试工具
    第12章- Lars启动工具脚本


    2-Lars-reactor.png

    我们接下来要设计线程池和与之对应的消息队列。具体的总体形势应该是这样的

    9-thread_pool.png
    这里面有几个类型,thread_pool就是我们要创建的线程池,这里面会有很多thread其中每个thread都会启动一个epoll也就是我们封装好的event_loop来监控各自创建好的tcp_conn的读写事件。每个thread都会有一个thread_queue消息任务队列与之绑定,每个thread_queue里面会接收task_msg任务类型。

    10.1 消息任务类型

    lars_reactor/include/task_msg.h

    #pragma  once
    #include "event_loop.h"
    
    struct task_msg
    {
        enum TASK_TYPE
        {
            NEW_CONN,   //新建链接的任务
            NEW_TASK,   //一般的任务
        };
    
        TASK_TYPE type; //任务类型
    
        //任务的一些参数
        
        union {
            //针对 NEW_CONN新建链接任务,需要传递connfd
            int connfd;
    
            /*====  暂时用不上 ==== */
            //针对 NEW_TASK 新建任务, 
            //那么可以给一个任务提供一个回调函数
            struct {
                void (*task_cb)(event_loop*, void *args);
                void *args;
            };
        };
    };
    

    ​ 这里面task_msg一共有两个类型的type,一个是新链接的任务,一个是普通任务。两个任务所携带的参数不同,所以用了一个union。

    10.2 消息任务队列

    lars_reactor/include/thread_queue.h

    #pragma once
    
    #include <queue>
    #include <pthread.h>
    #include <sys/eventfd.h>
    #include <stdio.h>
    #include <unistd.h>
    #include "event_loop.h"
    
    /*
     *
     * 每个thread对应的 消息任务队列
     *
     * */
    template <typename T>
    class thread_queue
    {
    public:
        thread_queue()
        {
            _loop = NULL;
            pthread_mutex_init(&_queue_mutex, NULL);
            _evfd = eventfd(0, EFD_NONBLOCK);
            if (_evfd == -1) {
                perror("evenfd(0, EFD_NONBLOCK)");
                exit(1);
            }
        }
    
        ~thread_queue()
        {
            pthread_mutex_destroy(&_queue_mutex);
            close(_evfd);
        }
    
        //向队列添加一个任务
        void send(const T& task) {
            //触发消息事件的占位传输内容
            unsigned long long idle_num = 1;
    
            pthread_mutex_lock(&_queue_mutex);
            //将任务添加到队列
            _queue.push(task);
    
            //向_evfd写,触发对应的EPOLLIN事件,来处理该任务
            int ret = write(_evfd, &idle_num, sizeof(unsigned long long));
            if (ret == -1) {
                perror("_evfd write");
            }
    
            pthread_mutex_unlock(&_queue_mutex);
        }
    
    
        //获取队列,(当前队列已经有任务)
        void recv(std::queue<T>& new_queue) {
            unsigned int long long idle_num = 1;
            pthread_mutex_lock(&_queue_mutex);
            //把占位的数据读出来,确保底层缓冲没有数据存留
            int ret = read(_evfd, &idle_num, sizeof(unsigned long long));
            if (ret == -1) {
                perror("_evfd read");
            }
    
            //将当前的队列拷贝出去,将一个空队列换回当前队列,同时清空自身队列,确保new_queue是空队列
            std::swap(new_queue, _queue);
    
            pthread_mutex_unlock(&_queue_mutex);
        }
    
    
        //设置当前thead_queue是被哪个事件触发event_loop监控
        void set_loop(event_loop *loop) {
            _loop = loop;  
        }
    
        //设置当前消息任务队列的 每个任务触发的回调业务
        void set_callback(io_callback *cb, void *args = NULL)
        {
            if (_loop != NULL) {
                _loop->add_io_event(_evfd, cb, EPOLLIN, args);
            }
        }
    
        //得到当前loop
        event_loop * get_loop() {
            return _loop;
        }
    
        
    private:
        int _evfd;            //触发消息任务队列读取的每个消息业务的fd
        event_loop *_loop;    //当前消息任务队列所绑定在哪个event_loop事件触发机制中
        std::queue<T> _queue; //队列
        pthread_mutex_t _queue_mutex; //进行添加任务、读取任务的保护锁
    };
    

    ​ 一个模板类,主要是消息任务队列里的元素类型未必一定是task_msg类型。

    thread_queue需要绑定一个event_loop。来触发消息到达,捕获消息并且触发处理消息业务的动作。

    ​ 这里面有个_evfd是为了触发消息队列消息到达,处理该消息作用的,将_evfd加入到对应线程的event_loop中,然后再通过set_callback设置一个通用的该queue全部消息所触发的处理业务call_back,在这个call_back里开发者可以自定义实现一些处理业务流程。

    1. 通过send将任务发送给消息队列。
    2. 通过event_loop触发注册的io_callback得到消息队列里的任务。
    3. 在io_callback中调用recv取得task任务,根据任务的不同类型,处理自定义不同业务流程。

    10.3 线程池

    ​ 接下来,我们定义线程池,将thread_queuethread_pool进行关联。

    lars_reactor/include/thread_pool.h

    #pragma once
    
    #include <pthread.h>
    #include "task_msg.h"
    #include "thread_queue.h"
    
    class thread_pool
    {
    public:
        //构造,初始化线程池, 开辟thread_cnt个
        thread_pool(int thread_cnt);
    
        //获取一个thead
        thread_queue<task_msg>* get_thread();
    
    private:
    
        //_queues是当前thread_pool全部的消息任务队列头指针
        thread_queue<task_msg> ** _queues; 
    
        //当前线程池中的线程个数
        int _thread_cnt;
    
        //已经启动的全部therad编号
        pthread_t * _tids;
    
        //当前选中的线程队列下标
        int _index;
    };
    

    属性:

    _queues:是thread_queue集合,和当前线程数量一一对应,每个线程对应一个queue。里面存的元素是task_msg

    _tids:保存线程池中每个线程的ID。

    _thread_cnt:当前线程的个数.

    _index:表示外层在选择哪个thead处理任务时的一个下标,因为是轮询处理,所以需要一个下标记录。

    方法:

    thread_pool():构造函数,初始化线程池。

    get_thread():通过轮询方式,获取一个线程的thread_queue.

    lars_reactor/src/thread_pool.cpp

    #include "thread_pool.h"
    #include "event_loop.h"
    #include "tcp_conn.h"
    #include <unistd.h>
    #include <stdio.h>
    
    /*
     * 一旦有task消息过来,这个业务是处理task消息业务的主流程
     *
     * 只要有人调用 thread_queue:: send()方法就会触发次函数
    */
    void deal_task_message(event_loop *loop, int fd, void *args)
    {
        //得到是哪个消息队列触发的 
        thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args;
    
        //将queue中的全部任务取出来
        std::queue<task_msg> tasks;
        queue->recv(tasks);
    
        while (tasks.empty() != true) {
            task_msg task = tasks.front();
    
            //弹出一个元素
            tasks.pop();
    
            if (task.type == task_msg::NEW_CONN) {
                //是一个新建链接的任务
                //并且将这个tcp_conn加入当当前线程的loop中去监听
                tcp_conn *conn = new tcp_conn(task.connfd, loop);
                if (conn == NULL) {
                    fprintf(stderr, "in thread new tcp_conn error\n");
                    exit(1);
                }
    
                printf("[thread]: get new connection succ!\n");
            }
            else if (task.type == task_msg::NEW_TASK) {
                //是一个新的普通任务
                //TODO
            } 
            else {
                //其他未识别任务
                fprintf(stderr, "unknow task!\n");
            }
        }
    }
    
    //一个线程的主业务main函数
    void *thread_main(void *args)
    {
        thread_queue<task_msg> *queue = (thread_queue<task_msg>*)args;
    
        //每个线程都应该有一个event_loop来监控客户端链接的读写事件
        event_loop *loop = new event_loop();
        if (loop == NULL) {
            fprintf(stderr, "new event_loop error\n");
            exit(1);
        }
    
        //注册一个触发消息任务读写的callback函数 
        queue->set_loop(loop);
        queue->set_callback(deal_task_message, queue);
    
        //启动阻塞监听
        loop->event_process();
    
        return NULL;
    }
    
    
    thread_pool::thread_pool(int thread_cnt)
    {
        _index = 0;
        _queues = NULL;
        _thread_cnt = thread_cnt;
        if (_thread_cnt <= 0) {
            fprintf(stderr, "_thread_cnt < 0\n");
            exit(1);
        }
    
        //任务队列的个数和线程个数一致
        _queues = new thread_queue<task_msg>*[thread_cnt];
        _tids = new pthread_t[thread_cnt];
    
        int ret;
        for (int i = 0; i < thread_cnt; ++i) {
            //创建一个线程
            printf("create %d thread\n", i);
            //给当前线程创建一个任务消息队列
            _queues[i] = new thread_queue<task_msg>();
            ret = pthread_create(&_tids[i], NULL, thread_main, _queues[i]);
            if (ret == -1) {
                perror("thread_pool, create thread");
                exit(1);
            }
    
            //将线程脱离
            pthread_detach(_tids[i]);
        }
    }
    
    thread_queue<task_msg>* thread_pool::get_thread()
    {
        if (_index == _thread_cnt) {
            _index = 0; 
        }
    
        return _queues[_index];
    }
    

    ​ 这里主要看deal_task_message()方法,是处理收到的task任务的。目前我们只对NEW_CONN类型的任务进行处理,一般任务先不做处理,因为暂时用不上。

    NEW_CONN的处理主要是让当前线程创建链接,并且将该链接由当前线程的event_loop接管。

    ​ 接下来我们就要将线程池添加到reactor框架中去。

    10.4 reactor线程池关联

    ​ 将线程池添加到tcp_server中。

    lars_reactor/include/tcp_server.h

    #pragma once
    
    #include <netinet/in.h>
    #include "event_loop.h"
    #include "tcp_conn.h"
    #include "message.h"
    #include "thread_pool.h"
    
    
    class tcp_server
    { 
    public: 
            // ...
            // ...
    private:
            // ...
      
        //线程池
        thread_pool *_thread_pool;
    }; 
    

    在构造函数中,添加_thread_pool的初始化工作。并且在accept成功之后交给线程处理客户端的读写事件。

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <strings.h>
    
    #include <unistd.h>
    #include <signal.h>
    #include <sys/types.h>          /* See NOTES */
    #include <sys/socket.h>
    #include <arpa/inet.h>
    #include <errno.h>
    
    #include "tcp_server.h"
    #include "tcp_conn.h"
    #include "reactor_buf.h"
    
    
    //server的构造函数
    tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port)
    {
        // ...
    
        //6 创建链接管理
        _max_conns = MAX_CONNS;
        //创建链接信息数组
        conns = new tcp_conn*[_max_conns+3];//3是因为stdin,stdout,stderr 已经被占用,再新开fd一定是从3开始,所以不加3就会栈溢出
        if (conns == NULL) {
            fprintf(stderr, "new conns[%d] error\n", _max_conns);
            exit(1);
        }
    
        //7 =============创建线程池=================
        int thread_cnt = 3;//TODO 从配置文件中读取
        if (thread_cnt > 0) {
            _thread_pool = new thread_pool(thread_cnt);
            if (_thread_pool == NULL) {
                fprintf(stderr, "tcp_server new thread_pool error\n");
                exit(1);
            }
        }
        // ========================================
    
        //8 注册_socket读事件-->accept处理
        _loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);
    }
    
    
    
    //开始提供创建链接服务
    void tcp_server::do_accept()
    {
        int connfd;    
        while(true) {
            //accept与客户端创建链接
            printf("begin accept\n");
            connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
            if (connfd == -1) {
                if (errno == EINTR) {
                    fprintf(stderr, "accept errno=EINTR\n");
                    continue;
                }
                else if (errno == EMFILE) {
                    //建立链接过多,资源不够
                    fprintf(stderr, "accept errno=EMFILE\n");
                }
                else if (errno == EAGAIN) {
                    fprintf(stderr, "accept errno=EAGAIN\n");
                    break;
                }
                else {
                    fprintf(stderr, "accept error\n");
                    exit(1);
                }
            }
            else {
                //accept succ!
                int cur_conns;
                get_conn_num(&cur_conns);
    
                //1 判断链接数量
                if (cur_conns >= _max_conns) {
                    fprintf(stderr, "so many connections, max = %d\n", _max_conns);
                    close(connfd);
                }
                else {
                                    // ========= 将新连接由线程池处理 ==========
                    if (_thread_pool != NULL) {
                        //启动多线程模式 创建链接
                        //1 选择一个线程来处理
                        thread_queue<task_msg>* queue = _thread_pool->get_thread();
                        //2 创建一个新建链接的消息任务
                        task_msg task;
                        task.type = task_msg::NEW_CONN;
                        task.connfd = connfd;
    
                        //3 添加到消息队列中,让对应的thread进程event_loop处理
                        queue->send(task);
                     // =====================================
                    }
                    else {
                        //启动单线程模式
                        tcp_conn *conn = new tcp_conn(connfd, _loop);
                        if (conn == NULL) {
                            fprintf(stderr, "new tcp_conn error\n");
                            exit(1);
                        }
                        printf("[tcp_server]: get new connection succ!\n");
                        break;
                    }
                }
            }
        }
    }
    
    

    10.5 完成Lars ReactorV0.8开发

    ​ 0.8版本的server.cpp和client.cpp是不用改变的。开启服务端和客户端观察执行结果即可。

    服务端:

    $ ./server 
    msg_router init...
    create 0 thread
    create 1 thread
    create 2 thread
    add msg cb msgid = 1
    add msg cb msgid = 2
    begin accept
    begin accept
    [thread]: get new connection succ!
    read data: Hello Lars!
    call msgid = 1
    call data = Hello Lars!
    call msglen = 11
    callback_busi ...
    =======
    

    客户端

    $ ./client 
    msg_router init...
    do_connect EINPROGRESS
    add msg cb msgid = 1
    add msg cb msgid = 101
    connect 127.0.0.1:7777 succ!
    do write over, del EPOLLOUT
    call msgid = 101
    call data = welcome! you online..�
    call msglen = 21
    recv server: [welcome! you online..]
    msgid: [101]
    len: [21]
    =======
    call msgid = 1
    call data = Hello Lars!
    call msglen = 11
    recv server: [Hello Lars!]
    msgid: [1]
    len: [11]
    =======
    

    ​ 我们会发现,链接已经成功创建成功,并且是由于线程处理的读写任务。


    关于作者:

    作者:Aceld(刘丹冰)

    mail: danbing.at@gmail.com
    github: https://github.com/aceld
    原创书籍gitbook: http://legacy.gitbook.com/@aceld

    原创声明:未经作者允许请勿转载, 如果转载请注明出处

    相关文章

      网友评论

          本文标题:(11)消息任务队列与线程池(Reactor部分)-【Lars-

          本文链接:https://www.haomeiwen.com/subject/gnsguctx.html