写程序难免与 IO 打交道,无论是文件 IO、网络 IO,涉及到数据的存储、交换,大多数情况都会见到IO 的身影。这次一起看下 IO 相关的一些知识,主要以网络 IO 为例进行展开。
基础概念
首先讲下几个概念,这里看不懂也没关系,结合下文就会明白了,这些概念都是基于 Linux 系统。
FD
在 Linux 下,一切皆文件。无论是普通文件,目录文件,字符设备文件(如键盘,鼠标...),块设备文件(如硬盘,光驱...),套接字(网络通信)等等,所有一切均抽象成文件,提供了统一的接口,方便应用程序调用。
FD 是 File descriptor 的缩写,也就是文件描述符。当应用程序请求内核打开、新建一个文件时,内核会返回一个非负整数(每个进程都从 0 开始分配)用于标记对应这个打开、新建的文件,这个数字就是文件描述符,文件描述符可以看成是文件的 id。后续应用程序都是通过这个文件描述符来操作相应的文件。
系统为了维护文件描述符建立了 3 个表:
- 进程级的文件描述符表
- 系统级的文件描述符表
- 文件系统的i-node表
通过上图可以看出,每个进程有自己的文件描述符表,通过文件描述符表可以关联到系统的文件描述符表,而系统描述符表存储了 inode 指针,通过inode指针就可以找到真实的文件进行操作了。而什么是 inode 表可以看下阮一峰大佬的博客,写的非常清楚。
文件描述符限制
文件描述符是一个重要的系统资源,理论上系统内存多大就应该可以打开多少个文件描述符,但是实际情况是,内核会有系统级限制,以及用户级限制(不让某一个应用程序进程消耗掉所有的文件资源),可以通过下面命令进行查看、修改。
用户级限制 | 系统级限制 | |
---|---|---|
查看 | ulimit -n | sysctl -a | grep -i file-max --color; cat /proc/sys/fs/file-max |
修改(重启失效) | ulimit -SHn 65535 | sysctl -w fs.file-max=65535; echo 65535 > /proc/sys/fs/file-max |
修改(永久生效) | 修改配置文件: /etc/security/limits.conf 在最后一行加入: * - nofile 65535 |
修改配置文件: /etc/sysctl.conf 在任意位置加入: fs.file-max=65535 执行(不执行则不生效): sysctl -p |
内核态、用户态
从上图可以看出来 Linux 系统大致分为这三层。
内核态从本质上说就是我们所说的内核,它是一种特殊的软件程序,用于控制计算机的硬件资源,例如协调 CPU 资源,分配内存资源,并且提供稳定的环境供应用程序运行。
而用户态就是提供应用程序运行的空间,为了使应用程序访问到内核管理的资源例如 CPU,内存,I/O。内核必须提供一组通用的访问接口,这些接口就叫系统调用就是图中 system-call interface to the kernel。
系统调用
系统调用时操作系统的最小功能单位。根据不同的应用场景,不同的 Linux 发行版本提供的系统调用数量也不尽相同,大致在 240-350 之间。这些系统调用组成了用户态跟内核态交互的基本接口,例如:用户态想要申请一块 20K 大小的动态内存,就需要 brk 系统调用。
在执行系统调用的时候就会从用户态切换到内核态,而调用结束后又会从内核态切换到用户态,整个过程对资源的消耗较大。因为过程中会涉及到 CPU 的中断(软中断)、堆栈的切换、寄存器的保存等各种现场保护工作,以便待调用结束之后程序能继续执行下去。
总体来说系统调用是昂贵的。
IO演进
BIO
BIO(Blocking IO)即阻塞 I/O,数据的读取写入必须阻塞在一个线程内,直至 IO 完成。
/**
* BIO实现的Server
* @author 鱼蛮 on 2021/3/8
**/
@Slf4j
public class ServerBio {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8090);
log.info("start server at {}", serverSocket.getLocalPort());
while (true) {
Socket socket = serverSocket.accept();
log.info("accept socket,ip:{},port:{}", socket.getInetAddress().getHostAddress(), socket.getPort());
new Thread(() -> {
String line;
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()))) {
while (true) {
// 读取
line = bufferedReader.readLine();
log.info("socket:{},input:{}", socket.getPort(), line);
// 应答
bufferedWriter.write("receive:" + line);
bufferedWriter.newLine();
bufferedWriter.flush();
}
} catch (Exception e) {
// do nothing
}
}).start();
}
}
}
上面这段程序就是基于 BIO 模型的一个简单服务端实现,我们可以看到由于是阻塞 IO,当主线程接收到一个连接之后如果不想被 Block 住,那么就必须新建一个线程来处理这个连接,这样才能接续监听新的请求。当然如果分成两个线程一个负责接收,一个负责处理请求,接收线程的确不会被 Block 住,但是负责处理请求的线程会一直被某个 Socket Block 住,直至处理完成才会处理下一个。
也就是说基于 BIO 模型,每来一个新连接都必须建立一个新的线程与之对应进行处理。而线程其实是系统宝贵的资源,本身系统线程数量就存在限制,而且太多的线程将导致过多的 CPU 资源浪费在线程调度上面。所以基于 BIO 的模型无法支撑单机过多的连接访问,大家可以搜索看下 C10K 问题。
但是 BIO 模型简单、易懂,在少量连接情况下会有很好的性能表现。
NIO
NIO(Non-blocking IO)即非阻塞 I/O,Socket 的数据读取、写入将不再是阻塞的,当没有数据输入的时候将返回特殊的值。
/**
* NIO实现的Server
* @author 鱼蛮 on 2021/3/8
**/
@Slf4j
public class ServerNio {
public static void main(String[] args) throws IOException {
LinkedList<SocketChannel> sockets = new LinkedList<>();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress(8090));
serverSocket.configureBlocking(false);
log.info("start server at {}", serverSocket.socket().getLocalPort());
while (true) {
SocketChannel socketChannel = serverSocket.accept();
if (Objects.isNull(socketChannel)) {
// do nothing
} else {
log.info("accept socket,ip:{},port:{}", socketChannel.socket().getInetAddress().getHostAddress(), socketChannel.socket().getPort());
socketChannel.configureBlocking(false);
sockets.add(socketChannel);
}
ByteBuffer buffer = ByteBuffer.allocate(2048);
for (SocketChannel item : sockets) {
// 读取
if (item.read(buffer) <= 0) {
continue;
}
buffer.flip();
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
String recStr = new String(bytes);
buffer.clear();
log.info("socket:{},input:{}", item.socket().getPort(), recStr);
// 应答
buffer.put(("receive:" + recStr).getBytes(StandardCharsets.UTF_8));
buffer.flip();
item.write(buffer);
buffer.clear();
}
try {
TimeUnit.MILLISECONDS.sleep(10L);
} catch (InterruptedException e) {
// do nothing
}
}
}
}
可以看出来使用 NIO 模型,只用一个线程就可以建立服务器端了,而且可以同时处理多个 Socket 的请求。但其实当调用 SocketChannel.read() 的时候实际执行的是“系统调用”,因为应用程序是不知道网络数据是否已经就绪,我们是需要询问内核来读取数据的。
我们上面说了“系统调用”是很昂贵的,当存在很多 Socket,而这些 Socket 又没多活跃的时候,这样频繁的系统调用就造成了很大的资源浪费。
IO 多路复用
由于频繁的执行“系统调用”造成了过多的资源浪费,那我们能不能一次传递多个 Socket 给内核,内核再一次性告诉我们那些已经完成了呢?这样就减少了“系统调用”的次数。因此产生了多路复用技术,多路复用基本思想就是一次传递多个需要询问的对象,内核告诉我哪些已经准备就绪。
select
最早出现的是 select 函数,我们先看下函数方法,可以通过:mau 2 select 查看这个方法的介绍。
select(int nfds, fd_set *restrict readfds, fd_set *restrict writefds, fd_set *restrict errorfds, struct timeval *restrict timeout);
在 select 方法中,支持了传入 readfds,即多个“文件描述符”也就是多个 Socket。但是一次传入的数量受操作系统的限制。
poll
poll(struct pollfd fds[], nfds_t nfds, int timeout);
之后出现的 poll 函数整体功能与 select 一致,可以看成是 select 的增强版本,去掉了传入文件描述符数量的限制。
epoll
select/poll 虽然解决了频繁进行“系统调用”的问题,但是传递 fd 的时候依然需要从用户态 copy 到内核态,内核态也需要频繁的遍历这些 fd,而且这个过程是循环进行,还是造成了不少的资源浪费。而 epoll 的出现就是为了解决这个问题,epoll 核心命令为:
- epoll_create:在内核申请一块内存区域用于存储需要监控的 fd,采用红黑树结构存储。
- epoll_ctl:将需要监控的 fd 添加到上面创建的集合里面。
- epoll_wait:获取就绪的 fd 集合,操作系统会将执行就绪的 fd 放入就绪集合中(采用链表存储),epoll_wait 检查这个集合即可。
使用 epoll 很好的解决了上面的问题,总体来说 epoll 在内核态开辟了两个集合,采用了空间换时间的方式来提升执行效率。
/**
* NIO实现的Server,利用了多路复用器,NIO最终实现方式是由系统决定,默认为epoll,也可能是select或者poll,可以通过参数指定
* socket() = 4
* fcntl(4, NONBLOCK)
* bind(4, 8090)
* listen(4)
*
* epoll_create(256) = 7
*
* epoll_ctl(7, EPOLL_CTL_ADD, 4)
*
* while(true) {
* epoll_wait(7, 2000)
*
* // 新连接
* accept(4) = 8
* epoll_ctl(7, EPOLL_CTL_ADD, 8)
* }
*
* @author 鱼蛮 on 2021/3/8
**/
@Slf4j
public class ServerNioMultiplex {
private Selector selector;
private ByteBuffer buffer;
private void initServer(int port) throws IOException {
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(false);
serverSocket.bind(new InetSocketAddress(port));
// 对epoll相当于epoll_create,而对select,poll相当于建了个数组而已
this.selector = Selector.open();
// 对epoll相当于epoll_ctl,而对select,poll相当于网数组里面添加一个值
serverSocket.register(this.selector, SelectionKey.OP_ACCEPT);
buffer = ByteBuffer.allocate(2048);
log.info("start server at {}", serverSocket.socket().getLocalPort());
}
public void start(int port) throws IOException {
// 初始化服务
initServer(port);
// 监听服务
while (true) {
// 对epoll相当于epoll_wait,而对select,poll相当select(num,fds)
if (this.selector.select(2000) <= 0) {
continue;
}
Set<SelectionKey> keys = this.selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// 客户端已经发送数据
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
int readCount = socketChannel.read(buffer);
// 客户端断开连接
if (readCount == -1) {
key.cancel();
socketChannel.close();
log.info("socket:{},exit", socketChannel.socket().getPort());
continue;
}
buffer.flip();
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
String recStr = new String(bytes);
log.info("socket:{},readCount:{},input:{}", socketChannel.socket().getPort(), readCount, recStr);
buffer.clear();
// 应答
buffer.put(("receive:" + recStr ).getBytes(StandardCharsets.UTF_8));
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
} // 新客户端连接
else if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(this.selector, SelectionKey.OP_READ);
log.info("accept socket,ip:{},port:{}", socketChannel.socket().getInetAddress().getHostAddress(), socketChannel.socket().getPort());
}
}
}
}
public static void main(String[] args) throws IOException {
ServerNioMultiplex serverNioMultiplex = new ServerNioMultiplex();
serverNioMultiplex.start(8090);
}
}
上面这段程序就是使用多路复用的一个小例子,Java NIO API 会根据操作系统来选择使用 select、poll、epoll,默认使用的是 epoll。
在一个线程中如果既处理新 Socket 的连接,又处理已连接 Socket 的读写,可能会因为某个连接的处理较慢而导致整个过程阻塞。所以一般的服务框架会采用多线程进行处理,例如一个线程负责监听新连接的进入,再开几个线程来分别负责多个 Socket 的读写。下面这段程序就是多路复用的多线程版本,Tomcat 的 NIO 线程模型也是类似的处理过程。
/**
* NIO实现的Server,利用了多路复用器,多线程版本,跟tomcat的nio模型类似
* @author 鱼蛮 on 2021/3/8
**/
@Slf4j
public class ServerNioMultiplexMT {
public void start(int port, int threadNum) throws IOException {
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(true);
serverSocket.bind(new InetSocketAddress(port));
AtomicInteger counter = new AtomicInteger();
Poller[] pollers = new Poller[threadNum];
for (int i = 0; i < threadNum; i++) {
pollers[i] = new Poller();
new Thread(pollers[i]).start();
}
log.info("start server at {}", serverSocket.socket().getLocalPort());
// 主线程监听连接
while (true) {
SocketChannel socketChannel = serverSocket.accept();
log.info("accept socket,ip:{},port:{}", socketChannel.socket().getInetAddress().getHostAddress(), socketChannel.socket().getPort());
pollers[counter.getAndIncrement() % pollers.length].register(socketChannel);
}
}
public static class Poller implements Runnable {
private final Selector selector;
private final ByteBuffer buffer;
private final BlockingQueue<SocketChannel> queue;
public Poller() throws IOException {
this.selector = Selector.open();
this.buffer = ByteBuffer.allocate(2048);
queue = new LinkedBlockingDeque<>(1024);
}
@SneakyThrows
public void register(SocketChannel socketChannel) {
this.queue.put(socketChannel);
}
@SneakyThrows
@Override
public void run() {
while (true) {
SocketChannel socketChannel = queue.poll();
if (Objects.nonNull(socketChannel)) {
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
// 没有事件
if (this.selector.select(1000) <= 0) {
continue;
}
Set<SelectionKey> keys = this.selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
// 客户端是否已经发送数据
if (!key.isReadable()) {
continue;
}
SocketChannel channel = (SocketChannel) key.channel();
// 读取数据
int readCount = channel.read(buffer);
// 客户端断开连接
if (readCount == -1) {
key.cancel();
channel.close();
log.info("socket:{},exit", channel.socket().getPort());
continue;
}
buffer.flip();
byte[] bytes = new byte[readCount];
buffer.get(bytes);
String recStr = new String(bytes);
log.info("socket:{},readCount:{},input:{}", channel.socket().getPort(), readCount, recStr);
buffer.clear();
// 应答
buffer.put(("receive:" + recStr ).getBytes(StandardCharsets.UTF_8));
buffer.flip();
channel.write(buffer);
buffer.clear();
}
}
}
}
public static void main(String[] args) throws IOException {
ServerNioMultiplexMT serverNioMultiplexMT = new ServerNioMultiplexMT();
serverNioMultiplexMT.start(8090, 2);
}
}
到这里我们可以看出 IO 技术的发展像是一个逐步解决问题的过程,其实整体技术的发展不也是这个过程吗?拿数据库举例,当单表瓶颈的时候我们需要分表,当分表也存储不下或者单机瓶颈的时候就需要分库。
同步 IO、异步 IO
说下我的理解,内核态读取完毕数据后是否主动写入到用户空间,如果是则为异步,否则都为同步。像 BIO、NIO、多路复用的 select、poll、epoll 都属于同步 IO。
在 Windows 系统中提供的IOCP机制实现了真正的异步 IO,Linux 系统中虽然也有异步 IO 的一些实现但是都不算成熟,存在一定性能问题。Java的 AIO Api 在 Windows 系统是使用的操作系统机制实现,而在 Linux 系统中是使用自建线程池来模拟实现的。这也是造成异步 IO 在 Linux 环境中没有大规模应用的原因吧,像很多知名的中间件如 Redis、Nignx 都是使用 epoll 机制实现。
其他
开启服务三部曲
任何网络服务开启都需要如下三次系统调用来完成服务的启动、绑定端口、监听。
- socket
- bind
- listen
epoll 三部曲
使用 epoll 机制的都需要如下三次系统调用来使用 epoll
- epoll_create
- epoll_ctl
- epoll_wait
系统调用查看
可以使用 strace 命令进行进程的系统调用追踪,同样可以用 man strace 进行详细的命令介绍查看。
示例里面使用的是 slf4j 进行的日志输出,最好修改为 System.out.println 就不需要再去加载依赖包了。例如我们使用 strace 命令监控一段我们上面的程序:strace -ff -o out java com/blackbread/io/ServerNioMultiplex
我们就可以观察到下面的输出信息,下图是 socket 的创建、监听过程。
socket创建下面是 epoll 的创建、使用过程。
epoll创建 添加监听下面是一个新客户端连入的过程,以及对新客户端的读写过程
新客户端接入 分配描述符 描述符监控 读取输入 输出信息简单客户端连接工具
nc命令,例如启动上面任意一段代码的程序后,执行 nc 127.0.0.1 8090,在控制台将输出如下信息:
开启服务在命令行窗口输入如下信息之后
nc 连接在控制台也将受到输入,并反馈信息给 nc 的客户端。
服务端交互
网友评论