by shihang.mai
1. 概述
通过一个系统调用,获取多个IO状态,叫多路复用器。在Linux下多路复用器都是同步模型。(只要程序自己读写IO,那么IO模型就是同步的)
多路复用器只关注IO:不关注从IO读写完之后的事情
同步:app自己R/W
异步:kernel完成R/W 只有win:iocp
阻塞:blocking
非阻塞:non-blocking
linux以及成熟的框架netty:同步非阻塞 同步阻塞
2. select poll
select poll系统调用
- 调用socket()返回一个fd
- 调用bind(fd,9090),将fd和端口绑定起来
- 调用listen(),监听端口
- 调用select(fds)查找有状态的IO
- 调用recv(有状态的fd)
其实无论NIO SELECT POLL都需要遍历所有的IO询问状态,只不过:
-
普通NIO:这个遍历成本在用户态切换内核态
-
select poll :这个遍历过程只触发了一次系统调用,用户态内核态的切换,过程中,把fds传递给内核,内核重新根据用户这次调用传过来的fds,遍历,修改状态
2.1 弊端
- 每次重复传递fds
- 每次内核被调用之后,针对这次调用,触发一次遍历fds全量的复杂度
3. epoll
3.1 概述
epoll现象调用过程
- 首先,执行无论BIO NOI SELECT poll都有的socket->bind->listen,如listen得到fd4->socket连接
- 调用epoll_create,创建fd6(叫epfd)->红黑树
- 调用epoll_ctl(fd6,add,fd4,accept),把fd4加入到fd6红黑树中
- 调用epoll_wait等待一个链表,这时链表没数据
- 当客户端通过网卡发送消息时,会在fd4的buffer中写数据,并且做一个延伸处理(原来的中断中加入延伸逻辑),将fd4在fd6中查找,并改变状态复制到链表中,这时链表含有fd4。
- 这时调用epoll_wait的话直接得到有状态的IO。
系统调用过程
- 调用socket()返回一个fd,例如fd1
- 调用bind(fd1,9090),将fd和端口绑定起来
- 调用listen(),监听端口
- 调用epoll_create(),产生一个红黑树,当然也会产生一个fd代表这个红黑树,例如fd2
- 调用epoll_ctl(fd2,add,fd1,accept),意思是在红黑树添加fd1,并且关注的是accept事件
- 调用epoll_wait(),它等待的是一个链表
- 当有一个连接过来,即是fd1的accept事件,就会将fd1转移到链表中,epoll_wait拿到了后accept产生一个fd3,调用epoll_ctl(fd2,add,fd3,recv)重新加入到红黑树
- 当这个链接发消息过来,就会将这个fd3移动到链表里
- 那么epoll_wait拿到的都是有状态的IO
优势:
- epoll直接调用epoll_ctl增量加入新的fd,解决重复传入fds问题,
- 在内核中做了将有状态的IO直接copy到链表,调用epoll_wait直接拿到有状态的IO,解决了遍历fds问题
3.2 举例
public class SocketMultiplexingSingleThreadv1 {
private ServerSocketChannel server = null;
private Selector selector = null;
int port = 9090;
public void initServer() {
try {
//这个server相当于listen状态的fd4
server = ServerSocketChannel.open();
//设置non-blocking
server.configureBlocking(false);
//绑定端口
server.bind(new InetSocketAddress(port));
/*
* 创建多路复用器 优先使用epoll
* select poll:创建一个数组用来存放fds
* epoll:调用epoll_create==>fd3
*/
selector = Selector.open();
/*
* select poll:把fd4放入数组
* epoll:调用epoll_ctl(fd3,add,fd4,epoll_in) 将fd4放入 上面epoll_create得到的内存空间fd3(实际运行时在selector.select时才放进去,懒加载),注册accept事件
*/
server.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() {
initServer();
System.out.println("服务器启动了。。。。。");
try {
while (true) {
/*
* select poll:查看数组中有的fds
* epoll:查看fd3 红黑树中的fds
*/
Set<SelectionKey> keys = selector.keys();
System.out.println(keys.size()+" size");
/*
* select poll:传入fds,查找有状态的IO
* epoll:epoll_wait
*/
while (selector.select(500) > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = selectionKeys.iterator();
//无论基于那种多路复用器,得到有状态的IO后,都要自行R/W
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
//是否需要accept。服务端未accept前,客户端建立连接并发消息,就会进到这里
if (key.isAcceptable()) {
acceptHandler(key);
//是否可读。即是否已经分配进程处理socket连接
} else if (key.isReadable()) {
readHandler(key);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void acceptHandler(SelectionKey key) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//accept得到client
SocketChannel client = ssc.accept();
//设置client non-blocking 为了读不阻塞
client.configureBlocking(false);
//创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(8192);
/*
*注册当前fd对应的read buffer
*/
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println("-------------------------------------------");
System.out.println("新客户端:" + client.getRemoteAddress());
System.out.println("-------------------------------------------");
} catch (IOException e) {
e.printStackTrace();
}
}
public void readHandler(SelectionKey key) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
int read = 0;
try {
while (true) {
read = client.read(buffer);
if (read > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
client.write(buffer);
}
buffer.clear();
} else if (read == 0) {
break;
} else {
client.close();
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
SocketMultiplexingSingleThreadv1 service = new SocketMultiplexingSingleThreadv1();
service.start();
}
}
这个红黑树观察的文件描述符是有限的,属性max_user_watch
4. epoll单线程/多线程
场景:服务端接收到客户端数据,写回给客户端
单线程_多线程epoll4.1 单线程
-
主线程通过selector查找有状态的IO,然后调用readHandler(),readHandler读取后直接write
-
主线程通过selector查找有状态的IO,然后调用readHandler(),readHandler里将当前fd.register selector OP_WRITE,那么再查找有状态的IO时,就会触发writeHandler(),再去write
4.2 多线程
- 主线程通过selector查找有状态的IO,然后new thread()去执行readHandler(),readHandler里将当前fd.register selector OP_WRITE,但这个操作需要时间,当抛出线程,立刻返回,再一次通过selector查找有状态的IO,仍然会有当前这个fd。所有会重复调起readHandler()
- 当1中注册了写事件,而写事件是看send-q有没空位,所以当主线程再次通过selector查找有状态的IO,会重复new thread()调起writeHandler
解决重复调用办法:在调用readHandler()和writeHandler()前加入key.cancle(epoll_ctl(del))
4.2.1 多线程弊端:
-
考虑资源利用,充分利用cpu核数。
-
考虑有一个fd执行耗时长,在一个线程里会阻塞后续的fd的处理
但是会重复register和cancle系统调用。
4.3 解决
- 当有N个fd有R/w处理时,将N个FD分组,每一个组一个selector,将一个selector压到一个线程上
- 最好的线程数量时cpu个数或者cpu个数*2
- 其实单看一个线程,里面只有一个selector,有一部分fd,它们在自己的cpu上执行,代表会有多个selector并行,并且线程内部线性的,最终是并行的fd被处理
- 那么我也可以拿出一个线程的selector只关注accept,然后将接受的fd分配给其他线程selector
网友评论