1. 概览
Java中主要有三种IO模型,分别是同步阻塞IO(BIO)、同步非阻塞IO(NIO)、异步非阻塞IO(AIO)。
Java IO的演进,其实是依赖于操作系统的IO操作接口实现。比如在Linux 2.6以后,Java中NIO和AIO都是通过epoll来实现的;而在Windows上,AIO是通过IOCP来实现的。
Linux(UNIX)操作系统中共有五种IO模型,分别是:阻塞IO模型、非阻塞IO模型、IO复用模型、信号驱动IO模型、异步IO模型。
回顾
同步与异步
- 同步:发起一个调用后,被调用者未处理完请求前,不向调用者返回响应。
- 异步:发起一个调用后,被调用者收到请求后立刻向调用者回应已接收到请求,但是被调用者并没有返回结果,此时调用者可以处理其他操作,被调用者通常依靠事件,回调等机制来通知调用者其返回结果。
阻塞和非阻塞
- 阻塞:发起一个请求后,调用者一直等待请求结果返回,也就是当前线程会被挂起,无法从事其他任务,只有当条件就绪才能继续。
- 非阻塞:发起一个请求后,调用者不用一直等着结果返回,可以先去干其他事情。
这里介绍Java BIO/NIO/AIO 是结合Socket网络I/O而谈,位于 java.net.*
包下。BIO 使用 java.io.*
包下阻塞IO;NIO使用 java.nio.*
包下非阻塞IO;AIO也就是NIO2.0版,是在NIO基础上提供异步支持。
NIO 是 JDK1.4 提供的API。nio中n有两层含义:
- new: 表示新的io接口
- Non-blocking: 非阻塞
AIO 是 JDK1.7 在
java.nio.*
包下提供的API
2. BIO
Blocking I/O,同步阻塞I/O模式,数据的读取写入必须阻塞在一个线程内等待其完成。
我们常说的I/O一般都是指BIO。
2.1. BIO 基本模型
BIO基本模型.png2.1.1. 特点描述
Socket 的连接(accept()
),数据的读写(read()
/write()
),都是阻塞的。请求一旦建立连接,就无法再接收其他连接。
2.1.2. 代码示例
public class SocketIO {
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(9090); // #1
System.out.println("1. new ServerSocket(9090)");
Socket client = server.accept(); // 阻塞1 // #2
System.out.println("2. client connect\t" + client.getPort());
InputStream in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
System.out.println(reader.readLine()); // 阻塞2 // #3
while (true) {
}
}
}
上述代码中,有两处阻塞,分别是 server.accept()
和 reader.readLine()
。
使用
strace -ff -o outlog java SocketIO
命令查看JVM调用了内核(kernel)的方法过程
#1
位置 new ServerSocket(9090)
,调用了内核:
socket(···) = 5 # 创建一个未连接的Socket,5是个fd(文件描述符)指向这个socket
bind(5, 9090, ···) # 绑定9090端口到socket上
listen(5, ···) # 把这个socket监听起来
#2
位置 server.accept()
,调用了内核:
poll([{fd=5, ···}], 1, -1) = 1 ([{fd=5, revents=POLLIN}]) # 等待请求过来连接。如果没有请求将阻塞,-1表示无限阻塞;有请求时,5fd退出监听交给accept建立连接
accept(5, {客户端信息}, [28]) = 6 # 从5fd中创建一个新的socket,连接client和server,释放5fd,让其继续处于监听状态,等待下一个连接
# 如果使用jdk1.4之前的版本运行,只调用了内核的 accept 方法,接收连接阻塞在此处
#3
位置 server.accept()
,调用了内核:
recvfrom(6, "hahahaha\n", 8192, 0, NULL, NULL) = 9 # 从6fd这个socket上读取接收到的数据,如果socket上没有消息,将在此阻塞
# 如果使用jdk1.4之前的版本运行,调用了内核的 recv 方法,读数据阻塞在此处
2.1.3. 问题瓶颈
主要问题突出点:
- 接收客户端连接阻塞
- 读取/写入数据阻塞
- 只能接收一个连接
2.2. BIO 传统模型
BIO服务通信模型.png2.2.1. 特点描述
服务端使用一个Acceptor线程,用于监听接收请求,收到请求后,创建Socket和一个新线程来服务客户端。
这种模型特点是请求数与线程数1:1
2.2.2. 代码示例
public class SocketIO2 {
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(9090);
System.out.println("1. new ServerSocket(9090)");
while (true) {
Socket client = server.accept(); // 阻塞1
System.out.println("2. client connect\t" + client.getPort());
Thread thread = new Thread(() -> {
try {
InputStream in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line); // 阻塞2
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("client closed\t" + client.getPort());
});
thread.start();
}
}
}
上述代码,可以实现多个客户都安连接,但是还存在两处阻塞。一个连接,占用一个线程。
2.1.3. 问题瓶颈
主要问题突出点:
- 接收客户端连接阻塞
- 读取/写入数据阻塞
- 服务器不能连接过多请求(C10K问题)
2.3. BIO 伪异步模型
BIO伪异步模型.png2.3.1. 特点描述
服务端的连接响应,使用线程池管理。可实现请求数与线程m:n(m可以大于n)
服务端性能的优化,取决于线程池的优化
2.3.2. 代码示例
public class SocketIO3 {
private static int THREAD_POOL_SIZE = 1;
private static ThreadPoolExecutor THREAD_POOL = getThreadPoolExecutor();
private static ThreadPoolExecutor getThreadPoolExecutor() { // 服务性能的优化,取决于线程池的优化
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "server-thread-" + mThreadNum.getAndIncrement());
}
};
return new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(THREAD_POOL_SIZE), threadFactory);
}
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(9090);
System.out.println("1. new ServerSocket(9090)");
while (true) {
Socket client = server.accept(); // 阻塞1
System.out.println("2. client connect\t" + client.getPort());
try {
THREAD_POOL.execute(() -> responseHandler(client)); // 响应处理
} catch (RejectedExecutionException e) {
e.printStackTrace();
rejectedHandler(client); // 线程池已满,拒绝处理
}
}
}
private static void responseHandler(Socket client) {
try {
InputStream in = client.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line;
while ((line = reader.readLine()) != null) {
System.out.println(line); // 阻塞2
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("client closed\t" + client.getPort());
}
private static void rejectedHandler(Socket client) throws Exception {
OutputStream out = client.getOutputStream();
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
writer.write("请求已达上线");
writer.close();
client.close();
}
}
2.3.3. 问题瓶颈
主要问题突出点:
- 接收客户端连接阻塞
- 读取/写入数据阻塞
- 服务器不能连接过多请求(比 2.2 会好一点)
2.4 总结
BIO从底层看,如果不能从底层根本上结果阻塞问题,其性能问题就不能得到有效解决。这种模型单一简单,适用于连接不多的场景。
3. NIO
- Non-blocking: NIO 流是非阻塞 IO 而 IO 流是阻塞 IO,非阻塞体现在 socket网络,内核机制上。
- new: NIO 有3个核心组件/特性:Channel、Buffer、Selector,体现在JDK上。
3.1. NIO 基本模型
3.1.1. 特点描述
Java中非阻塞IO,JDK中可通过 sockeChannel.configureBlocking(false)
把 accept()
和 read()/write()
设置成非阻塞,调用系统内核时,如果没有连接/数据读写,就返回-1。
SocketChannel 中提供数据的流入流出两个通道。数据读写时,先直接由操作系统将数据放入Buffer缓冲区中。JVM中Server对数据的读写直接访问Byffer缓冲区。
NIO数据读写模型.pngChannel 与 Buffer 搭配:
- 多个channel配一个buffer
- 一个channel配一个buffer(使用最多配置)
- 一个channel配两个buffer(一个读一个写)
Buffer 缓冲区创建位置:
- JVM内(堆内)
- JVM外(堆外,由OS管理分配)
服务器与外部收发数据,首先由OS管理,由系统将网卡数据放到对外,如果JVM想要使用,需要将数据拷贝到JVM内部,存在JVM内外来回拷贝,影响性能还浪费空间。
Linux内核支持共享空间(mmap),可以将网卡数据直接放到共享空间内存,这样JVM和OS就可以共同使用。Java中的 零拷贝 就是基于此。ByteBuffer 的创建位置,和 channel 的搭配方式根据实际应用场景灵活选择
3.1.2. 代码示例
public class SocketNIO {
private static final String CHARSET = "UTF-8";
public static void main(String[] args) throws Exception {
List<SocketChannel> clientList = new LinkedList<>();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(9090)); // 完成端口绑定,socket监听
ssc.configureBlocking(false); // false指定该ServerSocketChannel为非阻塞 // #1
System.out.println("1. ServerSocketChannel 9090 non-blocking");
while (true) {
Thread.sleep(1000);
SocketChannel client = ssc.accept(); // 接受连接非阻塞
if (client == null) {
System.out.print(".");
} else {
client.configureBlocking(false); // #2
System.out.println();
System.out.println("2. client connect\t" + client.socket().getPort());
clientList.add(client);
}
ByteBuffer buffer = ByteBuffer.allocateDirect(1024); // 配置缓冲区,可以堆内,可以堆外(allocate())
for (SocketChannel sc : clientList) {
int len = sc.read(buffer); // 读取数据,非阻塞
if (len > 0) {
buffer.flip();
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes);
buffer.clear();
String data = new String(bytes, CHARSET);
System.out.println();
System.out.println(String.format("received data[%-5d]: %s", sc.socket().getPort(), data));
}
}
}
}
}
#1
位置,让服务端接后客户端连接时 ssc.accept()
非阻塞
#2
位置,与客户端连接的Socket读/写数据时 sc.read(buffer)
非阻塞
运行代码,JVM调用了内核一下方法
socket(···) = 6
bind(6, 9090, ···)
listen(6, ···)
# 以上三步同BIO一样
# ssc.configureBlocking(false);
fcntl(6, F_SETFL, O_RDWR|O_NONBLOCK) = 0 # 设置6fd为非阻塞
# accept 不再阻塞。没有连接时,返回-1;有连接时,返回建立好的连接
accept(6, xxx, xxx) = -1 EAGAIN (Resource temporarily unavailable)
accept(6, {客户端信息}, [28]) = 7
# read 读取数据不在阻塞。channel没有数据返回-1
read(7, 0x7f92b428e590, 1024) = -1 EAGAIN (Resource temporarily unavailable)
read(7, "hello\n", 1024) = 6
3.1.3. 问题瓶颈
虽然NIO接收客户端连接和读/写数据都不在阻塞,但是上面的额连接、读写都在一个线程,操作是串行化的。高并发下,后面的读写操作降低了可接入连接的性能。可将客户端连接由一个线程处理,数据读写由两一个线程处理。
资源浪费:
-
while(true)
中,每次都要询问/调用accept()
,看是否由客户端请求接入。 - 如果接入了 1w/100w+ 个连接,每次都需要把所有连接
read()
一下看是否有数据过来(复杂度O(n))。
3.2. NIO 多路复用模型
3.2.1. 特点描述
Selector 多路复用器,JDK中使用Selector对象来注册/管理Channel(包含Server和Client),当有事件触发时,使用 selector.select()
和 selector.selectedKeys()
来获取有事件触发的Channel,我们根据事件类型来做相应处理。
注意Selector获取的是一个Channel状态,数据的读/写还是需要用户自己触发,即读写过程依然是同步。
Selector 的实现依赖内核支持,如:select/poll/epoll等
select/poll: 每次询问是否有事件到达,需要传入所有socket fd,只是所有fd的循环遍历交给了内存来完成(JDK不再干这个事)。
epoll: 先创建一个Selector fd,然后所有socket都添加到这个fd中,只添加一次即可。由这个fd管理所有socket的事件。
3.2.2. 代码示例
ServerSocketChannel
的有效事件为 OP_ACCEPT
。
SocketChannel
的有效事件为 OP_CONNECT
、OP_READ
、OP_WRITE
/**
* NIO,单线程多路复用
*/
public class SocketNIOMultiplexing {
private ServerSocketChannel server;
private Selector selector; // 多路复用器
public void initServer() throws IOException {
server = ServerSocketChannel.open();
server.bind(new InetSocketAddress(9090));
server.configureBlocking(false); // accept非阻塞
selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT); // 将ServerSocketChannel注册到Selector中,注册过程创建SelectionKey,表示Channel向Selector中注册的token
System.out.println("1. Server started [ServerSocketChannel Selector 9090]");
}
public void start() throws IOException {
initServer();
while (true) {
if (selector.select(0) < 1) { // 0表示每次询问不阻塞,立即返回;非0表示阻塞时间
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 取出有效的key
System.out.println("2. selector 中有事件状态进来 - size=" + selectionKeys.size());
for (Iterator<SelectionKey> it = selectionKeys.iterator(); it.hasNext(); it.remove()) { // 注意,这里key处理后要从keys中remove,不然下次会会获取没有事件key,poll进入死循环
SelectionKey selectionKey = it.next();
if (selectionKey.isAcceptable()) { // selectionKey中持用的Socket状态是可连接
acceptHandler(selectionKey); // 从socket(Server)中建立连接
} else if (selectionKey.isReadable()) { // selectionKey中持用的Socket状态是可读
readHandler(selectionKey); // 从socket(Client)中读取数据
}
}
}
}
private void acceptHandler(SelectionKey selectionKey) {
ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
System.out.println("3.1 接受连接");
try {
SocketChannel client = ssc.accept();
client.configureBlocking(false); // 非阻塞
ByteBuffer buffer = ByteBuffer.allocate(1024); // 为该client配置一个buffer,堆内
client.register(selector, SelectionKey.OP_READ, buffer); // 将SocketChannel注册到Selector中
System.out.println("一个新客户端连接: client=" + client.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
}
private void readHandler(SelectionKey selectionKey) {
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
System.out.println("3.2 读取数据");
buffer.clear(); // 读之前默认清空一次buffer
try {
while (true) {
int len = sc.read(buffer); // 读到数据返回数据字节长度;流没有结束,没有数据返回0,流结束返回-1
if (len > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
sc.write(buffer); // 把数据直接返回去
}
buffer.clear();
} else if (len == 0) {
break;
} else { // -1 注意bug,当tcp连接处于 close_wait 时,selectionKey.isReadable()返回true,这里出现死循环,cpu飙高
sc.close(); // 去掉bug,这里client close,主要是使用 key.cancel(); 从多路复用器的key集合中移除
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
SocketNIOMultiplexing server = new SocketNIOMultiplexing();
server.start();
}
}
运行代码,JVM调用了内核一下方法
socket(···) = 6
bind(6, 9090, ···)
listen(6, ···)
# 以上三步同BIO一样
fcntl(6, F_SETFL, O_RDWR|O_NONBLOCK) = 0 # 设置6fd非阻塞
pipe([7, 8]) # 创建一个channel,7fd表示管道的读取端,8fd表示管道的写端
fcntl(7, F_SETFL, O_RDONLY|O_NONBLOCK) = 0 # 设置7fd只读端口非阻塞
fcntl(8, F_SETFL, O_WRONLY|O_NONBLOCK) = 0 # 设置8fd只写端口非阻塞
# 创建 Selector 实例
epoll_create(256) = 9 # 创建一个新的epoll实例9fd
# channel.register(selector)
epoll_ctl(9, EPOLL_CTL_ADD, 7, ···) = 0 # 将7fd注册到9fd上,并将事件连接到7fd上
epoll_wait(9, ···) # 阻塞,9fd上等待事件到来触发
# client连接
rt_sigaction(SIGRT_30, ···, 8) = 0 # 信号安装登记
accept(6, {客户端信息}, [28]) = 10
3.2.3. 问题瓶颈
上面的模型不需要每次询问 accept()
和所有client的 read()
,每次只需要询问一次多路复用器Selector即可,显然复杂度降为O(1),解决了资源浪费问题。
但是Selector询问、Channel客户端连接、数据读写依然串行化。可使用多线将这个三者分开,Netty的模型就是基于此设计。
3.3. NIO 多路复用模型-多线程
3.3.1. 特点描述
把 3.2 中的问题,进行多线程分开。boss线程接收连接请求后,快速记录,交给worker线程,worker线程负责耗时的accept和数据的读写操作。(这里就有些netty的味道了)
3.3.2. 代码示例
/**
* NIO,多路复用,多线程
*/
public class SocketNioMultiplexing2 {
private static final String CHARSET = "UTF-8";
private ServerSocketChannel server;
private NioThread boss;
private NioThread[] workers;
static class NioThread extends Thread {
private static volatile int workerNum = 0; // worker线程数量
private static BlockingQueue<SocketChannel>[] workerQueues; // SocketChannel队列,长度=workerNum
private static AtomicInteger workerBalance = new AtomicInteger();
private Selector selector; // 多路复用器
private Integer tid; // 线程ID,null表示boss线程
public void config(Selector selector, Integer tid) {
this.selector = selector;
this.tid = tid;
if (tid == null) {
System.out.println("boss thread ready");
this.setName("boss");
} else {
System.out.println("worker-" + tid + " thread ready");
this.setName("worker-" + tid);
}
}
@Override
public void run() {
System.out.println(">> " + Thread.currentThread().getName() + " start");
try {
if (tid == null) {
bossLoop();
} else {
workerLoop();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void bossLoop() throws IOException {
while (true) {
if (selector.select(10) < 1) { // 询问是否有事件到达,最多阻塞10ms
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("2. boss-selector 中有请求事件状态进来 - size=" + selectionKeys.size());
for (Iterator<SelectionKey> it = selectionKeys.iterator(); it.hasNext(); it.remove()) {
SelectionKey selectionKey = it.next();
if (!selectionKey.isAcceptable()) {
continue;
}
ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
try {
SocketChannel client = ssc.accept();
client.configureBlocking(false);
int tid = workerBalance.getAndIncrement() % workerNum; // 负载均衡
workerQueues[tid].add(client);
System.out.println("3. 连接请求加入队列-" + tid);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private void workerLoop() throws IOException {
while (true) {
acceptHandler(workerQueues[tid]); // 建立连接
if (selector.select(10) < 1) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("2. worker-selector 中有读取事件状态进来 - size=" + selectionKeys.size());
for (Iterator<SelectionKey> it = selectionKeys.iterator(); it.hasNext(); it.remove()) {
SelectionKey selectionKey = it.next();
if (selectionKey.isReadable()) {
readHandler(selectionKey); // 读取数据
}
}
}
}
private void acceptHandler(BlockingQueue<SocketChannel> queue) throws IOException {
SocketChannel client = queue.poll();
if (client == null) {
return;
}
// 建立Channel连接,配置缓冲器
ByteBuffer buffer = ByteBuffer.allocate(1024); // 字节对齐?
client.register(this.selector, SelectionKey.OP_READ, buffer);
System.out.println("4. 一个新客户端连接: client=" + client.getRemoteAddress());
}
private void readHandler(SelectionKey selectionKey) {
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
System.out.println("3 读取数据");
buffer.clear(); // 读之前默认清空一次buffer
try {
while (true) {
int len = sc.read(buffer);
if (len > 0) {
buffer.flip();
while (buffer.hasRemaining()) { // Echo应答
ByteBuffer prefix = ByteBuffer.allocate(20);
prefix.put("Echo:".getBytes(CHARSET));
prefix.flip();
sc.write(new ByteBuffer[]{prefix, buffer});
}
buffer.clear();
} else if (len == 0) {
break;
} else {
sc.close();
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public SocketNioMultiplexing2 init(NioThread boss, NioThread[] workers) throws IOException {
this.boss = boss;
this.workers = workers;
server = ServerSocketChannel.open();
server.configureBlocking(false);
Selector boosSelector = Selector.open();
boss.config(boosSelector, null);
server.register(boosSelector, SelectionKey.OP_ACCEPT);
int workerNum = workers.length;
NioThread.workerNum = workerNum;
NioThread.workerQueues = new LinkedBlockingQueue[workerNum];
for (int i = 0; i < workerNum; i++) {
workers[i] = new NioThread();
workers[i].config(Selector.open(), i); // worker 线程,指定线程ID
NioThread.workerQueues[i] = new LinkedBlockingQueue<>();
}
return this;
}
public SocketNioMultiplexing2 port(int port) throws IOException {
server.bind(new InetSocketAddress(port));
return this;
}
public void start() throws IOException {
boss.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (NioThread worker : workers) {
worker.start();
}
System.out.println(String.format("1. Server started [boss: %s, workers: %d]", server.getLocalAddress().toString(), workers.length));
}
public static void main(String[] args) throws IOException {
NioThread boss = new NioThread(); // boss 线程
NioThread[] workers = new NioThread[2]; // worker 线程
SocketNioMultiplexing2 server = new SocketNioMultiplexing2();
server.init(boss, workers)
.port(9090)
.start();
}
}
4. AIO
Asynchronous I/O,异步非阻塞I/O模型。这里的 异步 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。
在大多数业务场景中,我们往往在读/写数据时需要阻塞,获取读取到的数据/写入数据状态等。
目前来看,Linux上 AIO 的应用还不真正的异步,可以说时伪异步,Netty 之前也尝试使用过 AIO,不过又放弃回归到NIO上。
参考
网友评论