美文网首页转载部分
libevent 线程池的设计

libevent 线程池的设计

作者: liualiu | 来源:发表于2019-05-11 19:35 被阅读0次

    Thread pool

    背景

    在我所做过的一个基于libevent项目中, 我所使用的线程模型是 one event_base per thread + thread pool 模型。每个线程最多有一个event_base, 每一个 TCP 连接必须由某一个event_base 管理,所有这个线程的 IO 都会转移到这个 event_base 上面来处理。换句话来说,一个 file descriptor 只能由一个线程来读写。这样,我们就很方便地把不同的 TCP 连接放到不同的线程里面去。

    一个节点支持多线程,它有两种模式

    • 单线程, accept 与 TCP 的消息处理在同一个线程做 IO
    • 多线程, accept 有一个专门的线程接受连接,然后创建一个新的 Thread-Pool, 新的连接会按round-robin的方式分配。

    具体实现

    在其他的线程中,只要拿到一个 libevent_thread_t 的对象,往里面的 base 添加事件就好了。

    typedef struct libevent_thread_s
    {
        pthread_t thread_id;    //  pid of this thread
        struct event_base * base;   // libevent handle this thread uses
        
        // this pipe is used to stop the event_base from its own thread
        struct event * notify_event;
        int notify_receive_fd;
        int notify_send_fd;
        int no;                 // the thread number, which is needed for test
    } libevent_thread_t;
    

    创建新线程调用的函数(一直循环):

    static void* worker_libevent(void *arg)
    {
        libevent_thread_t* me = (libevent_thread_t *)arg;
        event_base_dispatch(me->base);
        return NULL;
    }
    
    

    虽然其他的线程可以往这个线程添加事件,但是当一个 libevent_thread_t 对象释放的时候, 其他的线程不能 break 这个线程的 event_base,只能是这个线程自己 break 自己的 event_base,所以在实现上我们可以首先要打开一个管道

    libevent_thread_t* libevent_thread_new(){
        int fds[2];
        if(pipe(fds)){
            printf("Can't create notify pipe");
            return NULL;
        }
        pr_libevent_thread_t* t = g_new0(pr_libevent_thread_t, 1);
        t->notify_receive_fd = fds[0];
        t->notify_send_fd = fds[1];
        setup_thread(t);
        return t;
    }
    
    

    为这个管道一端的 fd 在event_base上面添加监听事件,有数据可读的时候就 break event_base 就可以退出了

    static void thread_libevent_process(int fd, short which, void * arg)
    {
        libevent_thread_t* me = (libevent_thread_t *)arg;
        char buf[1];
        read(fd, buf, 1);
        event_base_loopbreak(me->base);
    }
    
    static int setup_thread(libevent_thread_t* me)
    {
        me->base = event_base_new();
        if(!me->base)
        {
            printf( "Can't allocate event base\n");
            return -1;
        }
        me->notify_event = event_new(me->base, me->notify_receive_fd, EV_READ | EV_PERSIST,
                                     thread_libevent_process, me);
        if(event_add(me->notify_event, NULL) == -1)
        {
            printf("Can't monitor libevent notify pipe\n");
            return -1;
        }
        return 0;
    }
    

    所以当我们想要退出时,向 fd 发送数据

    void libevent_thread_wakeup(libevent_thread_t* me)
    {
        char one[1];
        one[0] = 1;
        write(me->notify_send_fd, one, sizeof(one));
    }
    
    

    至于线程池,就是一个 libevent_thread_t 的数组,通过next来指定函数返回的线程,其中 n 为线程池容量大小, next 自增到 n 时变成 0

    libevent_thread_t* event_pool_get_next_thread(libevent_thread_pool_t*);
    
    typedef struct libevent_thread_pool_s
    {
        int next;
        struct spinlock lock;
        int n;
        pr_libevent_thread_t** slot;
    } libevent_thread_pool_t;
    
    

    注意事项

    在多线程环境中,libevent 的 event_base 的 loopbreak必须由他自己的线程来实现,所以其他线程只能是通过管道来通知。
    在一个线程往另一个线程的 event_base 添加事件的时候,也就是在多个线程对于一个 event_base 操作的时候,event_base 需要对它自己的数据结构加锁,所以在使用线程池或者写多线程的程序的时候,开始时需要调用, 文档

    evthread_use_pthreads()
    

    相关文章

      网友评论

        本文标题:libevent 线程池的设计

        本文链接:https://www.haomeiwen.com/subject/ynrfaqtx.html