![](https://img.haomeiwen.com/i17293320/b0dc385c7a8814b5.jpg)
1.概述
memcached网络模块是基于libevent库开发的,主要分为两个模块:连接监听线程,工作线程。连接监听线程是用来监听来自客户端连接的,工作线程主要是用来完成具体业务逻辑处理。
网络模块模型
![]()
网络模块时序图
![]()
2.连接监听线程(主线程)
1.初始化主线程libevent实例,用于监听来自客户端的连接
2.work线程的资源分配与初始化
int main (int argc, char **argv) {
//.......
/* initialize main thread libevent instance */
#if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
/* If libevent version is larger/equal to 2.0.2-alpha, use newer version */
struct event_config *ev_config;
ev_config = event_config_new();
event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
main_base = event_base_new_with_config(ev_config);
event_config_free(ev_config);
#else
/* Otherwise, use older API */
main_base = event_init();
//main_base 主线程的libevent实例,主要用于监听来自客户端连接
#endif
//.......
//work线程相关初始化
memcached_thread_init(settings.num_threads, storage);
//.......
errno = 0;
//服务端socket绑定ip与端口进行连接监听
if (settings.port && server_sockets(settings.port, tcp_transport,
portnumber_file)) {
vperror("failed to listen on TCP port %d", settings.port);
exit(EX_OSERR);
}
//.......
/* enter the event loop */
if (event_base_loop(main_base, 0) != 0) {
retval = EXIT_FAILURE;
}
//.......
return retval
}
server_sockets根据配置信息开启服务端socket的监听
static int server_sockets(int port, enum network_transport transport,
FILE *portnumber_file) {
if (settings.inter == NULL) {
return server_socket(settings.inter, port, transport, portnumber_file);
}else{
// tokenize them and bind to each one of them..
char *b;
int ret = 0;
char *list = strdup(settings.inter);
//.......
ret |= server_socket(p, the_port, transport, portnumber_file);
//.......
return ret;
}
}
static int server_socket(const char *interface,
int port,
enum network_transport transport,
FILE *portnumber_file) {
//.......
for (next= ai; next; next= next->ai_next) {
//.......
conn *listen_conn_add;
//服务端监听fd的生成
if ((sfd = new_socket(next)) == -1) {
//.......
setsockopt(sfd,...);
bind(sfd,...);
listen(sfd,...);
//至此完成了服务端socket参数设置、bind、listen等工作
//接下来就是将该sfd加入到libevent实例中进行"客户端连接"事件的监听
//.......
//该函数功能执行相关libevent配置,以及针对该套接字句柄作一些相关资源的配置
conn_new(sfd, conn_listening, EV_READ | EV_PERSIST, 1, transport, main_base)));
//.......
}
}
//.......
}
3.work线程
1.监听来自客户端连接的可读事件
2.获取客户端的操作请求,完成具体业务逻辑处理功能
//work线程初始入口函数
void memcached_thread_init(int nthreads, void *arg) {
//.......
//nthreads work线程数目
//threads 是LIBEVENT_THREAD[nthreads]数组,每个work线程都有个自己对应的LIBEVENT_THREAD
//是LIBEVENT_THREAD结构体保存了当前线程相关资源,例如:libevent实例,套接字fd队列等
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
for (i = 0; i < nthreads; i++) {
//for循环为每个线程生成一个管道
int fds[2];
//管道初始化
if (pipe(fds)) {
perror("Can't create notify pipe");
exit(1);
}
//第i号work线程的接收管道
threads[i].notify_receive_fd = fds[0];
//第i号work线程的发送管道
threads[i].notify_send_fd = fds[1];
//.......
//线程资源初始化,主要是线程的libevent实例初始化,套接字fd队列等资源初始化
setup_thread(&threads[i]);
//.......
}
//.......
for (i = 0; i < nthreads; i++) {
//创建work线程
//worker_libevent是work线程函数
create_worker(worker_libevent, &threads[i]);
}
}
static void setup_thread(LIBEVENT_THREAD *me) {
#if defined(LIBEVENT_VERSION_NUMBER) && LIBEVENT_VERSION_NUMBER >= 0x02000101
struct event_config *ev_config;
ev_config = event_config_new();
event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
me->base = event_base_new_with_config(ev_config);
event_config_free(ev_config);
#else
me->base = event_init();
#endif
//me->base 将一个libevent实例保存到LIBEVENT_THREAD中
//每个线程都维护着自己的libevent实例,用于监听文件句柄事件的发生
//me->base这个libevent实例主要用于监听"客户端fd连接"是否有可读事件
//.......
//notify_event也是一个libevent实例,主要用来监听当前work线程是否有"管道事件"
//当主线程向me->notify_send_fd管道写'c'的时候
//me->notify_receive_fd接收管道有数据可读,那么就会触发thread_libevent_process回调函数
//thread_libevent_process管道回调函数
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
//.......
//线程套接字fd队列资源初始化
me->new_conn_queue = malloc(sizeof(struct conn_queue));
if (me->new_conn_queue == NULL) {
perror("Failed to allocate memory for connection queue");
exit(EXIT_FAILURE);
}
cq_init(me->new_conn_queue);
//.......
}
//创建线程work线程函数
typedef void *(*func)(void *) pfunc;
static void create_worker(pfunc func, void *arg) {
pthread_attr_t attr;
int ret;
pthread_attr_init(&attr);
if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {
fprintf(stderr, "Can't create thread: %s\n",
strerror(ret));
exit(1);
}
}
/*
* Worker thread: main event loop
*/
static void *worker_libevent(void *arg) {
LIBEVENT_THREAD *me = arg;
//.......
//libevent事件循环监听,如果有事件发生,就会触发对应的业务回调函数
event_base_loop(me->base, 0);
event_base_free(me->base);
return NULL;
}
thread_libevent_process管道回调函数,例如:当主线程监听到有客户端连接过滤,主线程accept之后返回该客户端的fd句柄,主线程不会将fd直接加入到work线程的libevent中进行可读事件的监听,而是分两步进行,第一步:将fd封装一个item节点,将item节点放入到一个work线程的item队列中。第二步:通过管道写入'c'消息,通知对应的work线程到它自己的item队列中取item节点,然后加入到自己的libevent中对fd进行可读事件的监听。而这个thread_libevent_process就是管道可读事件的回调触发函数。
//管道事件回调函数
static void thread_libevent_process(int fd, short which, void *arg) {
//线程资源
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];
conn *c;
unsigned int timeout_fd;
//notify_receive_fd读管道读1个字节数据
if (read(fd, buf, 1) != 1) {
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
return;
}
switch (buf[0]) {
case 'c':
//'c'消息就是主线程将客户端新连接加到work线程new_conn_queue队列中,并通过notify_send_fd通知work线程去取
item = cq_pop(me->new_conn_queue);
//item 客户端fd封装结构
if (NULL == item) {
break;
}
switch (item->mode) {
case queue_new_conn:
//item->init_state = conn_new_cmd 新连接
c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport,
me->base);
if (c == NULL) {
//.......
} else {
//c->thread保存work线程LIBEVENT_THREAD资源
//这样fd会话连接就可以访问该work主线程的LIBEVENT_THREAD相关资源
c->thread = me;
}
break;
case queue_redispatch:
conn_worker_readd(item->c);
break;
}
cqi_free(item);
break;
/* we were told to pause and report in */
case 'p':
register_thread_initialized();
break;
/* a client socket timed out */
case 't':
//超时线程的通知消息
//超时线程会定时扫描conns连接会话,如果发现某个连接会话超时了,
//那么就会写't'消息通知该会话对应的work线程关闭该会话
if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) {
if (settings.verbose > 0)
fprintf(stderr, "Can't read timeout fd from libevent pipe\n");
return;
}
//关闭会话
conn_close_idle(conns[timeout_fd]);
break;
}
}
1.conn_new主要是针对新连接会话做资源分配与初始化,以及将fd套接字加入到libevent中进行监听.在主线程和work线程中都有涉及
2.drive_machine是一个非常重要的函数,它内部会根据conn
连接的具体状态,选择执行相应的业务处理逻辑,我们这里可以理解为一个状态机
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base) {
//conn保存了sfd连接的相关信息,我在这里将其理解为一个会话
//conns[MAX]数组就是维护着当前系统所有的连接会话
conn *c;
c = conns[sfd];
if (NULL == c) {
c = (conn *)calloc(1, sizeof(conn));
//.......
c->sfd = sfd;
conns[sfd] = c;
}
//事件回调函数event_handler,通过c->state状态值调用相应的业务逻辑处理
c->state = init_state;
//.......
//将套接字句柄sfd加入到libevent实例中进行监听
//当sfd套接字有可读事件发生的时候 libevent会回调event_handler函数
//回调函数event_handler就是具体的业务逻辑处理
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
event_add(&c->event, 0);
//.......
return c;
}
//事件回调函数
void event_handler(const int fd, const short which, void *arg) {
conn *c;
c = (conn *)arg;
//.......
//状态机
drive_machine(c);
//.......
}
static void drive_machine(conn *c) {
bool stop = false;
int sfd;
//.......
while (!stop) {
switch(c->state) {
case conn_listening:
//.......
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);
//.......
//服务端accept一个新的客户端连接句柄sfd
//将该sfd通过轮训的方式分发到对应work线程的fd队列中,然后通过管道通知
//对应work线程到对应fd队列中去取客户端连接句柄
//work线程获取该sfd后将其加入work线程的libevent进行监听,work线程libevent主要监听sfd是否有可读数据
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, c->transport);
stop = true;
break;
case conn_waiting:
//.......
break;
case conn_read:
//.......
break;
case conn_parse_cmd:
//.......
break;
case conn_new_cmd:
//.......
reset_cmd_handler(c);
//.......
break;
case conn_nread:
//.......
break;
//.......
case conn_max_state:
//.......
break;
}
}
//.......
}
//分发一个新的连接到其他线程
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport) {
//将sfd封装到CQ_ITEM节点中
CQ_ITEM *item = cqi_new();
char buf[1];
//通过轮训的方式确定一个work线程的编号
//settings.num_threads work线程数目
int tid = (last_thread + 1) % settings.num_threads;
//LIBEVENT_THREAD 该结构保存了线程相关资源
//threads全局变量
//threads + tid 对应线程相关资源
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
item->sfd = sfd;
//init_state = conn_new_cmd
//新连接
item->init_state = init_state;
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
item->mode = queue_new_conn;
//将封装好sfd的节点push到对应work线程fd队列中
cq_push(thread->new_conn_queue, item);
MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
//’c'新连接标志
buf[0] = 'c';
//将标志写入notify_send_fd管道
//work线程发现notify_receive_fd管道有数据发送过来,就会触发管道事件的回调函数
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}
}
static void reset_cmd_handler(conn *c) {
//.......
if (c->rbytes > 0) {
//已经读取了客户端的数据,那么设置状态为conn_parse_cmd,
//提示状态机drive_machine下一步执行conn_parse_cmd逻辑处理
conn_set_state(c, conn_parse_cmd);
} else {
//还未读取数据,是一个新连接
//将状态设置为conn_waiting,提示状态机下一步执行conn_waiting逻辑处理
//conn_waiting中执行了修改update_event(c, EV_READ | EV_PERSIST),用于监听可读事件
conn_set_state(c, conn_waiting);
}
//.......
}
网友评论