美文网首页程序员
memcached源码分析-网络模块

memcached源码分析-网络模块

作者: saltcc | 来源:发表于2019-04-20 14:22 被阅读15次

1.概述

memcached网络模块是基于libevent库开发的,主要分为两个模块:连接监听线程,工作线程。连接监听线程是用来监听来自客户端连接的,工作线程主要是用来完成具体业务逻辑处理。

网络模块模型


网络模块时序图


2.连接监听线程(主线程)

1.初始化主线程libevent实例,用于监听来自客户端的连接
2.work线程的资源分配与初始化

int main (int argc, char **argv) {
    //.......
   /* initialize main thread libevent instance */
#if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
    /* If libevent version is larger/equal to 2.0.2-alpha, use newer version */
    struct event_config *ev_config;
    ev_config = event_config_new();
    event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
    main_base = event_base_new_with_config(ev_config);
    event_config_free(ev_config);
#else
    /* Otherwise, use older API */
    main_base = event_init();
    //main_base 主线程的libevent实例,主要用于监听来自客户端连接
#endif
    //.......
    //work线程相关初始化
    memcached_thread_init(settings.num_threads, storage);
    //.......
    errno = 0;
    //服务端socket绑定ip与端口进行连接监听
    if (settings.port && server_sockets(settings.port, tcp_transport,
                                       portnumber_file)) {
        vperror("failed to listen on TCP port %d", settings.port);
        exit(EX_OSERR);
    }
    //.......
    /* enter the event loop */
    if (event_base_loop(main_base, 0) != 0) {
        retval = EXIT_FAILURE;
    }
    //.......
    return retval
}

server_sockets根据配置信息开启服务端socket的监听

static int server_sockets(int port, enum network_transport transport,
    FILE *portnumber_file) {

    if (settings.inter == NULL) {
        return server_socket(settings.inter, port, transport, portnumber_file);
    }else{
        // tokenize them and bind to each one of them..
        char *b;
        int ret = 0;
        char *list = strdup(settings.inter);
        //.......
        ret |= server_socket(p, the_port, transport, portnumber_file);
        //....... 
        return ret;
    }
}

static int server_socket(const char *interface,
                         int port,
                         enum network_transport transport,
                         FILE *portnumber_file) {
    //.......
    for (next= ai; next; next= next->ai_next) {
        //.......
        conn *listen_conn_add;
        //服务端监听fd的生成
        if ((sfd = new_socket(next)) == -1) {
            //.......
            setsockopt(sfd,...);
            bind(sfd,...);
            listen(sfd,...);
            //至此完成了服务端socket参数设置、bind、listen等工作
            //接下来就是将该sfd加入到libevent实例中进行"客户端连接"事件的监听
            //.......
            //该函数功能执行相关libevent配置,以及针对该套接字句柄作一些相关资源的配置
            conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, transport, main_base)));
            //.......
        }
    }
    //.......
}

3.work线程

1.监听来自客户端连接的可读事件
2.获取客户端的操作请求,完成具体业务逻辑处理功能

//work线程初始入口函数
void memcached_thread_init(int nthreads, void *arg) {
    //.......
    //nthreads work线程数目
    //threads 是LIBEVENT_THREAD[nthreads]数组,每个work线程都有个自己对应的LIBEVENT_THREAD
    //是LIBEVENT_THREAD结构体保存了当前线程相关资源,例如:libevent实例,套接字fd队列等
    threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));

    for (i = 0; i < nthreads; i++) {
        //for循环为每个线程生成一个管道
        int fds[2];
        //管道初始化
        if (pipe(fds)) {
            perror("Can't create notify pipe");
            exit(1);
        }
        //第i号work线程的接收管道
        threads[i].notify_receive_fd = fds[0];
        //第i号work线程的发送管道
        threads[i].notify_send_fd = fds[1];
        //.......
        //线程资源初始化,主要是线程的libevent实例初始化,套接字fd队列等资源初始化
        setup_thread(&threads[i]);  
        //.......
    }
    //.......
    for (i = 0; i < nthreads; i++) {
        //创建work线程
        //worker_libevent是work线程函数
        create_worker(worker_libevent, &threads[i]);
    }
}

static void setup_thread(LIBEVENT_THREAD *me) {
#if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
    struct event_config *ev_config;
    ev_config = event_config_new();
    event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
    me->base = event_base_new_with_config(ev_config);
    event_config_free(ev_config);
#else
    me->base = event_init();
#endif

    //me->base 将一个libevent实例保存到LIBEVENT_THREAD中
    //每个线程都维护着自己的libevent实例,用于监听文件句柄事件的发生
    //me->base这个libevent实例主要用于监听"客户端fd连接"是否有可读事件
    //.......    
    //notify_event也是一个libevent实例,主要用来监听当前work线程是否有"管道事件"
    //当主线程向me->notify_send_fd管道写'c'的时候
    //me->notify_receive_fd接收管道有数据可读,那么就会触发thread_libevent_process回调函数
    //thread_libevent_process管道回调函数
    event_set(&me->notify_event, me->notify_receive_fd,
              EV_READ | EV_PERSIST, thread_libevent_process, me);
    event_base_set(me->base, &me->notify_event);

    if (event_add(&me->notify_event, 0) == -1) {
        fprintf(stderr, "Can't monitor libevent notify pipe\n");
        exit(1);
    }
    //.......
    //线程套接字fd队列资源初始化
    me->new_conn_queue = malloc(sizeof(struct conn_queue));
    if (me->new_conn_queue == NULL) {
        perror("Failed to allocate memory for connection queue");
        exit(EXIT_FAILURE);
    }
    cq_init(me->new_conn_queue);

    //.......
}

//创建线程work线程函数
typedef void *(*func)(void *)  pfunc;
static void create_worker(pfunc func, void *arg) {
    pthread_attr_t  attr;
    int             ret;

    pthread_attr_init(&attr);

    if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
        fprintf(stderr, "Can't create thread: %s\n",
                strerror(ret));
        exit(1);
    }
}

/*
 * Worker thread: main event loop
 */
static void *worker_libevent(void *arg) {
    LIBEVENT_THREAD *me = arg;
    //.......
    //libevent事件循环监听,如果有事件发生,就会触发对应的业务回调函数
    event_base_loop(me->base, 0);

    event_base_free(me->base);
    return NULL;
}

thread_libevent_process管道回调函数,例如:当主线程监听到有客户端连接过滤,主线程accept之后返回该客户端的fd句柄,主线程不会将fd直接加入到work线程的libevent中进行可读事件的监听,而是分两步进行,第一步:将fd封装一个item节点,将item节点放入到一个work线程的item队列中。第二步:通过管道写入'c'消息,通知对应的work线程到它自己的item队列中取item节点,然后加入到自己的libevent中对fd进行可读事件的监听。而这个thread_libevent_process就是管道可读事件的回调触发函数。

//管道事件回调函数
static void thread_libevent_process(int fd, short which, void *arg) {
    //线程资源
    LIBEVENT_THREAD *me = arg;
    CQ_ITEM *item;
    char buf[1];
    conn *c;
    unsigned int timeout_fd;
    //notify_receive_fd读管道读1个字节数据
    if (read(fd, buf, 1) != 1) {
        if (settings.verbose > 0)
            fprintf(stderr, "Can't read from libevent pipe\n");
        return;
    }

    switch (buf[0]) {
    case 'c':
        //'c'消息就是主线程将客户端新连接加到work线程new_conn_queue队列中,并通过notify_send_fd通知work线程去取
        item = cq_pop(me->new_conn_queue);
        //item 客户端fd封装结构

        if (NULL == item) {
            break;
        }
        switch (item->mode) {
            case queue_new_conn:
                //item->init_state = conn_new_cmd 新连接
                c = conn_new(item->sfd, item->init_state, item->event_flags,
                                   item->read_buffer_size, item->transport,
                                   me->base);
                if (c == NULL) {
                    //.......
                } else {
                    //c->thread保存work线程LIBEVENT_THREAD资源
                    //这样fd会话连接就可以访问该work主线程的LIBEVENT_THREAD相关资源
                    c->thread = me;
                }
                break;

            case queue_redispatch:
                conn_worker_readd(item->c);
                break;
        }
        cqi_free(item);
        break;
    /* we were told to pause and report in */
    case 'p':
        register_thread_initialized();
        break;
    /* a client socket timed out */
    case 't':
        //超时线程的通知消息
        //超时线程会定时扫描conns连接会话,如果发现某个连接会话超时了,
        //那么就会写't'消息通知该会话对应的work线程关闭该会话
        if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) {
            if (settings.verbose > 0)
                fprintf(stderr, "Can't read timeout fd from libevent pipe\n");
            return;
        }
        //关闭会话
        conn_close_idle(conns[timeout_fd]);
        break;
    }
}

1.conn_new主要是针对新连接会话做资源分配与初始化,以及将fd套接字加入到libevent中进行监听.在主线程和work线程中都有涉及
2.drive_machine是一个非常重要的函数,它内部会根据conn连接的具体状态,选择执行相应的业务处理逻辑,我们这里可以理解为一个状态机

conn *conn_new(const int sfd, enum conn_states init_state,
                const int event_flags,
                const int read_buffer_size, enum network_transport transport,
                struct event_base *base) {
    //conn保存了sfd连接的相关信息,我在这里将其理解为一个会话
    //conns[MAX]数组就是维护着当前系统所有的连接会话
    conn *c;
    c = conns[sfd];

    if (NULL == c) {
        c = (conn *)calloc(1, sizeof(conn));
        //.......
        c->sfd = sfd;
        conns[sfd] = c;
    }
    //事件回调函数event_handler,通过c->state状态值调用相应的业务逻辑处理
    c->state = init_state;
    //.......
    //将套接字句柄sfd加入到libevent实例中进行监听
    //当sfd套接字有可读事件发生的时候 libevent会回调event_handler函数
    //回调函数event_handler就是具体的业务逻辑处理
    event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    event_add(&c->event, 0);
    //.......
    return c;
}

//事件回调函数
void event_handler(const int fd, const short which, void *arg) {
    conn *c;
    c = (conn *)arg;
    //.......
    //状态机
    drive_machine(c);
    //.......
}

static void drive_machine(conn *c) {

    bool stop = false;
    int sfd;
    //.......
    while (!stop) {
        switch(c->state) {
            case conn_listening:
                //.......
                sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
                //.......
                //服务端accept一个新的客户端连接句柄sfd
                //将该sfd通过轮训的方式分发到对应work线程的fd队列中,然后通过管道通知
                //对应work线程到对应fd队列中去取客户端连接句柄
                //work线程获取该sfd后将其加入work线程的libevent进行监听,work线程libevent主要监听sfd是否有可读数据
                dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
                                    DATA_BUFFER_SIZE, c->transport);
                stop = true;
                break;
            case conn_waiting:
                //.......
                break;
            case conn_read:
                //.......
                break;
            case conn_parse_cmd:
                //.......
                break;
            case conn_new_cmd:
                //.......
                reset_cmd_handler(c);
                //.......
                break;
            case conn_nread:
                //.......
                break;
            //.......
            case conn_max_state:
                //.......
                break;
        }
    }
    //.......
}

//分发一个新的连接到其他线程
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
                       int read_buffer_size, enum network_transport transport) {

    //将sfd封装到CQ_ITEM节点中
    CQ_ITEM *item = cqi_new();
    char buf[1];

    //通过轮训的方式确定一个work线程的编号
    //settings.num_threads work线程数目
    int tid = (last_thread + 1) % settings.num_threads;

    //LIBEVENT_THREAD 该结构保存了线程相关资源
    //threads全局变量
    //threads + tid 对应线程相关资源
    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;

    item->sfd = sfd;
    //init_state = conn_new_cmd
    //新连接
    item->init_state = init_state;
    item->event_flags = event_flags;
    item->read_buffer_size = read_buffer_size;
    item->transport = transport;
    item->mode = queue_new_conn;

    //将封装好sfd的节点push到对应work线程fd队列中
    cq_push(thread->new_conn_queue, item);

    MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);

    //’c'新连接标志
    buf[0] = 'c';

    //将标志写入notify_send_fd管道
    //work线程发现notify_receive_fd管道有数据发送过来,就会触发管道事件的回调函数
    if (write(thread->notify_send_fd, buf, 1) != 1) {
        perror("Writing to thread notify pipe");
    }
}

static void reset_cmd_handler(conn *c) {
    //.......
    if (c->rbytes > 0) {
        //已经读取了客户端的数据,那么设置状态为conn_parse_cmd, 
        //提示状态机drive_machine下一步执行conn_parse_cmd逻辑处理
        conn_set_state(c, conn_parse_cmd);
    } else {
        //还未读取数据,是一个新连接
        //将状态设置为conn_waiting,提示状态机下一步执行conn_waiting逻辑处理
        //conn_waiting中执行了修改update_event(c, EV_READ | EV_PERSIST),用于监听可读事件
        conn_set_state(c, conn_waiting);
    }
    //.......
}

相关文章

网友评论

    本文标题:memcached源码分析-网络模块

    本文链接:https://www.haomeiwen.com/subject/eigxgqtx.html