为了防止服务器accept()后阻塞等客户端连接或阻塞等客户端的写入, 将这个工作交给内核去做, 以提高效率, 这叫做多路I/O复用模型.
select()
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
先声明fd_set readfds;
, 然后把文件集作为传入传出参数传入, 当select()返回时readfds的位图可能已发生变化(客户端写入完毕要读取了), 需挨个用FD_ISSET()判断fd是否仍在readfds中.
参数1: nfds, 代表所有监听的文件描述符中, 最大的文件描述符+1
参数2: readfds, 所监听的文件描述符"可读"事件
参数3: writefds, 所监听的文件描述符"可写"事件
参数4: exceptfds, 所监听的文件描述符"异常"事件
参数5: 设置未发生改变时最大等待时间
返回值: 成功: 所监听的所有监听集合中, 满足条件的总数(读+写+异常), 只有监听的集合至少有一个满足了才会返回; 失败返回-1设置errno.
有4个函数控制fd_set(二进制位图类型)的状态:
void FD_ZERO(fd_set *set);
将set清空为0
void FD_SET(int fd, fd_set *set);
将fd设置到set集合中, 对应位置为1
void FD_CLR(int fd, fd_set *set);
将fd从set中清除出去, 对应位置为0
int FD_ISSET(int fd, fd_set *set);
判断fd是否在集合中
在socket未建立连接时, 只需要监听初始listen_sockfd的读事件(读建立连接请求), 当接收到建立连接的请求后, 服务器再调用accept()处理(无需阻塞), 同时返回新的sockfd监听读写事件.
select缺点:
1.select受文件描述符数量限制, 只能监听小于1024个客户端. (改ulimit -a都没用)
2.只能挨个fd循环检查, 当fd间隔过大时, 过于耗时. 需要维护一个数组存放要监听的fd.
3.fd_set作为传入传出参数, 会被select修改, 导致用户设置的初始监听状态不保存, 需要手动备份一下该fd_set.
在每次调用select()之前, 先把包含listenfd和所有当前客户端fd的allset复制给readset, 之后readset作为select()的传入传出参数, 如果某个客户端fd没有写入则自动将其从readset中移除, 并返回所有fd_set中发生变化的数量nready. 之后, 对于listen_fd要特意检查, 如果有新客户端建立连接(readset中listenfd已发生变化), 将其fd添加到allset和自定义数组(初始值全为-1)中以待下次监听.
之后, 处理完listenfd新建立的连接后, 挨个检查readset中每一个fd是否仍存在(即已发生变化), 并对变化的进行read()操作. 此时的readset是上一次的allset, 不包含新添加的连接, 所以是逐个比较判断上次建立过的连接这次是否发生变化(写入).
poll()
poll()相对于select()的改进:
1.修改ulimit -a突破1024个文件描述符限制
2.监听和返回文件集分离, 不用再复用传入传出参数
3.传入参数为事件数组, 可以缩小遍历范围, 不用全遍历一遍
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
&fds是结构体数组首地址, nfds是数组元素数量, timeout传-1阻塞等/0立即返回/大于0等待指定秒数. 返回值同select()仍是发生变化的数量, 如果想同时监听一个fd的读和写就把数组两个结构体元素的fd赋值成同一个fd.
struct pollfd fds[5000]; // 修改ulimit -a突破1024
fds[0].fd = listenfd;
fds[0].events = POLLIN/POLLOUT/POLLERR
fds[0].revents = 0 // 由系统自动返回, 可以不用初始化
fds[1].fd = fd1;
fds[1].events = POLLIN/POLLOUT/POLLERR
poll(fds, 5, -1) // 传入数量是数组实际初始化过的数量, 不是最大数量
使用if(fds[i].revents & POLLIN)
判断返回事件是否符合条件.
如果read()返回-1, 可能是服务器断了(收到SYN没返回ACK), 也可能是客户端断了. 使用errno==ECONNRESET
判断是否是服务器没有返回ACK, 如果服务端断了要把sockfd关闭, 并把数组中fd置为-1, 重新三次握手. 如果客户端断了直接perror, exit()退出.
epoll()
epoll相对于poll的改进:
1.无需遍历整个event数组, 只需要遍历活跃的事件数组, 甚至不用遍历(只监听1个), 直接回调.
2.可以直接使用回调函数加速处理
3.内部的mmap与用户空间交换速率更高
4.ET搭配非阻塞I/O效率比LT更高
在大量连接数和少量监听数的情况下, 使用epoll()才有效果. 如果3-1023的大部分都监听, 和select()差不多.
malloc()和mmap()底层都是调linux的kmalloc().
int epoll_create(int num)
的参数只是建议值, 表示要监听的文件描述符个数(红黑树节点数), 返回红黑树根节点为epfd(第一个可用的fd). 红黑树是平衡二叉树一种, 其左右子树高度差小于1, 使用二分查找最快.
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
第一个参数是epoll_create()返回的epfd根节点, 第二个参数是要采取的行动, 有EPOLL_CTL_ADD
, EPOLL_CTL_MOD
, EPOLL_CTL_DEL
三种. 第三个参数表示要操作的文件描述符, 第四个参数的结构体有事件类型和union联合体(指针/数据)两种成员, 事件类型有EPOLLIN/EPOLLOUT/EPOLLERR, 联合体内的fd和第三个参数传一样的值. 联合体内的泛型指针也可以传包含函数指针的结构体指针进去, 这样就可以直接调用事件的处理函数(回调函数).
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
这里的events是一个数组, 传出参数, 表示满足条件的fd. maxevents是数组大小, timeout同上-1阻塞/0非阻塞/等待毫秒数. 成功返回有多少个fd符合条件.
int epfd = epoll_create(100);
struct epoll_event events;
events.events = EPOLLIN; // LT水平触发(默认)
// events.events = EPOLLIN | EPOLLET; // ET边沿触发
event.data.fd = lfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &events);
struct epoll_event events[100];
epoll_wait(epfd, events, 100, -1);
解决ET一次读不完的缺点, 需要改变epoll为非阻塞等待: 使用fcntl()修改套接字属性为非阻塞, 然后轮询的读(while(len = read() > 0)), 这样read()不阻塞还只调用了一次epoll_wait, 优于LT. 否则即使LT不阻塞, 但没读完也要重新调一次epoll_wait()
flag = fcntl(connfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(connfd, F_SETFL, flag);
epoll反应堆模型
反应堆模型防止client不能写的情况下还去写, 造成服务器阻塞, 比如滑动窗口满了的状态或者client意外关闭(TCP会传RST给server).
普通流程:
epoll_wait() --- 服务器 --- 监听 --- lfd/cfd --- 可读事件 --- epoll_wait()返回 --- read --- 小写转大写 --- write --- epoll_wait()继续监听
反应堆流程:
epoll_wait() --- 服务器 --- 监听 --- lfd/cfd --- 可读事件 --- epoll_wait()返回 --- read --- cfd从红黑树上摘下 --- 设置监听cfd写事件及回调函数,]节点上树 --- 进行操作(如小写转大写) --- 等待epoll_wait()返回 --- write回写客户端 --- cfd从树上摘下 --- 设置监听cfd读事件及回调函数 --- epoll_wait()继续监听
回调函数的参数就是struct myevent_s结构体本身, 该结构体存储了fd, 回调函数及其参数, 最后活跃时间等各种需要用到的变量. 可以通过最后活跃时间关闭60秒内不和服务器通信的客户端.
网友评论