epoll_create
#include <sys/epoll.h>
int epoll_create(int size);
- 返回值:
success:返回一个非0 的未使用过的最小的文件描述符
error:-1 errno被设置
创建一个epoll实例,返回一个指向该实例的文件描述符,size用来告诉内核这个监听的数目一共有多大。
需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下查看/proc/进程id/fd/,是能够看到这个fd的( eg: ls /proc/$(ps -aux | grep './main' | awk 'NR==1 { print $2 }')/fd
),所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
- epoll_create1
int epoll_create1(int flags); - flags: - 如果这个参数是0,这个函数等价于epoll_create(0) - EPOLL_CLOEXEC:这是这个参数唯一的有效值,如果这个参数设置为这个。 那么当进程替换映像的时候会关闭这个文件描述符,这样新的映像中就无法对这个文件描述符操作, 适用于多进程编程+映像替换的环境里
epoll_ctl
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数
- 第一个参数是epoll_create()的创建的epoll实例。
- 第二个参数表示动作,用3个宏表示:
EPOLL_CTL_ADD:注册新的fd到epfd中 EPOLL_CTL_MOD:修改已经注册的fd的监听事件 EPOLL_CTL_DEL:从epfd中删除一个fd
- 第三个参数是需要监听的fd
- 第四个参数是内核需要监听什么事件,struct epoll_event结构如下:
#include <sys/epoll.h> struct epoll_event { __uint32_t events; epoll_data_t data; } typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t;
- EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
- EPOLLOUT:表示对应的文件描述符可以写
- EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
- EPOLLERR:表示对应的文件描述符发生错误
- EPOLLHUP:表示对应的文件描述符被挂断
- EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的
- EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
epoll_wait
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *event, int maxevents, int timeout);
等待事件的产生。
-
参数events用来从内核得到事件的集合
-
maxevents告之内核这个events有多大(数组成员的个数),这个maxevents的值不能大于创建epoll_create()时的size。
-
参数timeout是超时时间,单位毫秒(0会立即返回,-1将是永久阻塞)。
该函数返回需要处理的事件数目,返回的事件集合在events数组中,如返回0表示已超时。
工作模式
epoll对文件描述符的操作有2种模式:LT和ET。
-
LT模式:
LT(水平触发)是默认的工作模式,并且同时支持阻塞和非阻塞socket。
对于采用LT工作模式的文件描述符,当epoll_wait检测到其上有时间发生并将此事件通知应用程序后,应用程序可以不立即处理该事件,这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次相应应用程序通告此事件,直到该事件被处理。 -
ET模式:
ET(Edge Trigger,边沿触发)是一种高效的模式,只支持非阻塞socket。
对于采用ET模式的文件描述符,当epoll_wait检测到其上有事件发生并将此时间通知应用程序以后,应用程序可以不立即处理该事件,但是后续的epoll_wait将不再向应用程序通知这一事件。
为什么将ET称为高效的工作方式了?就是因为ET不用多次触发,减少了每次epoll_wait可能需要返回的fd数量,在并发event数量极多的情况下能够加快epoll_wait的处理速度。
注意epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。 -
server
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #include <stdbool.h> #define PORT "8888" #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 10 int setnonblocking(int fd); void addfd(int epollfd, int fd, bool enable_et); void lt(struct epoll_event* events, int number, int epollfd, int listenfd ) { char buf[BUFFER_SIZE]; for (int i = 0; i < number; i++) { int sockfd = events[i].data.fd; if (sockfd == listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof( client_address ); int connfd = accept(listenfd, (struct sockaddr* )&client_address, &client_addrlength ); addfd(epollfd, connfd, false); // // epoll event set LT } else if (events[i].events & EPOLLIN) { printf("event trigger once\n"); memset(buf, '\0', BUFFER_SIZE); int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0); if( ret <= 0 ) { close( sockfd ); continue; } printf("get %d bytes of content: %s\n", ret, buf); } else { printf("something else happened \n"); } } } void et(struct epoll_event* events, int number, int epollfd, int listenfd) { char buf[ BUFFER_SIZE ]; for (int i = 0; i < number; i++) { int sockfd = events[i].data.fd; if (sockfd == listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); addfd(epollfd, connfd, true); // epoll event set ET } else if (events[i].events & EPOLLIN) { printf("event trigger once\n"); /* 因为ET模式只触发一次,所以使用循环确保数据全部接受*/ while(1) { memset(buf, '\0', BUFFER_SIZE); int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0); if(ret < 0) { /* 下面if条件成立,则读缓冲区数据已经读取完成*/ if( (errno == EAGAIN) || (errno == EWOULDBLOCK) ) { printf( "read later\n" ); break; } close(sockfd); break; } else if(ret == 0) { close(sockfd); } else { printf("get %d bytes of content: %s\n", ret, buf); } } } } } int main(int argc, char* argv[]) { int ret = 0; const char* ip = "127.0.0.1"; int port = atoi(PORT); struct sockaddr_in address; memset(&address, sizeof(address), 0); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); struct epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd, listenfd, true); // listenfd is ET while(true) { int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if (ret < 0) { printf("epoll failure\n"); break; } //lt(events, ret, epollfd, listenfd); // epoll event default mode is LT. et(events, ret, epollfd, listenfd); } close(listenfd); return 0; } int setnonblocking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int epollfd, int fd, bool enable_et) { setnonblocking(fd); struct epoll_event event; event.data.fd = fd; event.events = EPOLLIN; if(enable_et) event.events |= EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); }
-
client
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <stdlib.h> #include <signal.h> #define PORT 8888 #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 1000 void sig_handle(int sig) { printf("catch a sigpipe signal\n"); printf("server have close\n"); exit(0); } int main( int argc, char* argv[] ) { const char* ip = "127.0.0.1"; int ret = 0; char buf[1024]; struct sockaddr_in address; memset(&address, sizeof(address), 0); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(PORT); signal(SIGPIPE, sig_handle); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert( listenfd >= 0 ); while(1) { int ret = connect(listenfd,(struct sockaddr *)&address,sizeof(struct sockaddr_in)); if (ret < 0) { printf( "connect failure\n" ); sleep(1); continue; } break; } while(1) { memset(buf, 0, sizeof(buf)); fgets(buf, BUFFER_SIZE, stdin); printf("len: %ld:%s\n", strlen(buf), buf); send(listenfd, buf, strlen(buf), 0); } close( listenfd ); return 0; }
当使用LT模式时:
LT.jpg当使用ET模式时:
ET.jpg参考资料
[1]《UNIX 网络编程》3th [美] W.Richard Stevens,Bill Fenner,Andrew M. Rudoff
[2] http://www.cnblogs.com/ajianbeyourself/p/5859989.html
[3] https://blog.csdn.net/hnlyyk/article/details/50946194
网友评论