1 阻塞IO
传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 accept(),read() 或 write()时,该线程被阻塞,直到有一些数据被读取或写入,该线程在此期间不能执行其他任务。
阻塞accept()
当没有客户端连接时,用户进程会一直阻塞。
阻塞IO读取
image对于一次IO读取,数据会先从网络设备缓冲区被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的缓冲区,最后返回。
如果在网络设备缓冲区中没有发现数据会导致应用程序进程阻塞等待。
阻塞IO写入
image对于一次IO写入,数据会先应用程序的缓冲区被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到网络设备缓冲区,最后返回。
在网络阻塞严重的时候,由于网络设备的缓冲区数据无法发送到网络中,会一直堆积直到没有足够的内存来进行写操作,从而导致应用程序进程阻塞等待。
2 阻塞IO 单线程
@Test
public void test_server() throws Exception {
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));
/** 设置为阻塞IO模型 **/
serverSocketChannel.configureBlocking(true);
while (true){
System.out.println("阻塞等待客户端连接" );
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端连接成功");
ByteBuffer readbuf = ByteBuffer.allocate(100);
System.out.println("阻塞等待客户端请求数据" );
/**
* 等待 网络设备 --copy-->操作系统内核缓冲区
* 等待 操作系统内核缓冲区 --copy --> 用户内存
* 此函数一共经历2次拷贝等待
* **/
socketChannel.read(readbuf);
System.out.println("读取客户端请求数据成功" );
System.out.println(new String(readbuf.array()));
}
}
@Test
public void test_readcanunblock_client() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(true);
System.out.println("阻塞服务端端响应连接" );
socketChannel.connect(new InetSocketAddress("localhost", 8888));
System.out.println("连接服务端成功" );
/** 等待一段事件向客户端后向服务请求数据 **/
TimeUnit.SECONDS.sleep(30);
socketChannel.write(ByteBuffer.wrap("hello server".getBytes()));
}
由于整个接收请求和处理请求都是在同一个线程里,按照当前示例同时只能处理一个连接请求。如果某个请求很慢会导致其他请求阻塞。
image阻塞IO 多线程
使用多线程技术将请求操作交给子线程完成,可以让多个请求同时处理。
@Test
public void test_server2() throws Exception {
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));
/** 设置为阻塞IO模型 **/
serverSocketChannel.configureBlocking(true);
while (true){
System.out.println("阻塞等待客户端连接" );
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("收到客户端");
new BeginThread(socketChannel).start();
}
}
image
这种方式每次一个新的连接,都会启动一个线程。如果存在1百万个连接,那么需要创建1百万个线程,JVM会对限制进程线程的数量,如果超过这时会抛出异常。即使设置支持1百万个线程,那么按照一个连接最少64k内存来算,64k*1000000 约 61G,也足以让OOM将进程杀死
阻塞IO 线程池
为了解决同步阻塞IO面临一个链路需要一个线程处理的问题。后端通过一个线程池来处理多个客户端的请求。通过线程池可以灵活的调配线程池资源,设置线程的最大值,防止由于海量并发请求导致线程耗尽。
@Test
public void test_server3() throws Exception {
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));
/** 设置为阻塞IO模型 **/
serverSocketChannel.configureBlocking(true);
while (true){
System.out.println("阻塞等待客户端连接" );
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("收到客户端");
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new BeginThread(socketChannel));
}
}
阻塞IO 线程池弊端
使用线程池是为管控进程内的线程资源。当遇到海量的请求时,线程池内部的工作线程数量N,一定远远小于请求任务数量M。当线程池内阻塞队列中任务满时只能阻塞新请求或拒绝新请求。这就需要我们能快速处理请求。然而阻塞IO 发起read() 或 write()时,该线程都是同步阻塞的。阻塞时间取决于对方IO线程的处理速度和网络IO的传输速度。相当于我们应用程序需要依赖对方处理速度,导致我们应用程序的可靠性降低。
案例
如果我们应用程序在和一个故障节点通信。
-
1 阻塞IO读取故障节点的数据,由于读取输入流是阻塞的。因此同步阻塞时间又平时的10S变成60S
-
2 假如大量线程池内工作线程都在读取这个故障节点数据,那么由于线程池内工作线程处理缓慢,导致新的请求处理不过来,被放入阻塞队列中。
-
3 由于阻塞队列容量有限。当超过限制时只能阻塞或拒绝新的请求。
2 非阻塞IO
Java NIO 是非阻塞模式的。当一个线程调用 accept(),read() 或 write()时,该线程不会被阻塞,会直接返回。
非阻塞accept()
相对于阻塞IO,非阻塞会直接返回。当不存在客户端连接时serverSocketChannel.accept()会返回一个NULL。来告知应用程序没有客户端连接。
非阻塞IO读取
相对于阻塞IO读取,非阻塞如果在网络设备缓冲区中没有发现数据不会阻塞而会直接返回。我们可以通过判断返回读取数据大小。来判断网络设备缓冲区中是否存在数据读取。
非阻塞IO写入
相对于阻塞IO写入,在网络阻塞严重的时候,由于网络设备的缓冲区数据无法发送到网络中,会一直堆积直到没有足够的内存来进行写操作,这时并不会阻塞当前线程而是写入失败返回。我们可以通过判断返回写入数据大小。判断是否写入成功。
/**
* 非阻塞 IO
*/
@Test
public void test_UnBlock_server() throws Exception {
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));
/** 设置为非阻塞IO模式 **/
serverSocketChannel.configureBlocking(false);
boolean is_Run = true;
while (is_Run) {
/** 非阻塞IO模式,accept()会立刻返回,如果请求没有到达返回null **/
SocketChannel socketChannel = serverSocketChannel.accept();
/** 判断请求是否到达 **/
if (Optional.ofNullable(socketChannel).isPresent()) {
boolean is_Read = true;
while (is_Run) {
/** 设置为非阻塞IO模式 **/
socketChannel.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
/** 非阻塞IO模式,read()会立刻返回,,如果请求没有数据到达返回0 **/
int read = socketChannel.read(byteBuffer);
if (read > 0) {
System.out.println(new String(byteBuffer.array()));
break;
} else {
System.out.println("客户端请求数据未到达");
TimeUnit.SECONDS.sleep(1);
}
}
} else {
System.out.println("客户端请求连接未到达");
TimeUnit.SECONDS.sleep(2);
}
}
}
@Test
public void test_UnBlock_client() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
/** 设置为非阻塞IO模式 **/
socketChannel.configureBlocking(false);
/** 非阻塞IO模式,accept()会立刻返回 **/
socketChannel.connect(new InetSocketAddress("localhost", 8888));
/** 等待客户端连接成功 **/
while (!socketChannel.finishConnect()){
}
/** 等待一段事件向客户端后向服务请求数据 **/
TimeUnit.SECONDS.sleep(5);
socketChannel.write(ByteBuffer.wrap("hello server".getBytes()));
System.out.println("数据已发送");
while (true) {
TimeUnit.SECONDS.sleep(1);
}
}
//服务端
客户端请求连接未到达
客户端请求连接未到达
客户端请求连接未到达
客户端请求连接未到达
客户端请求连接未到达
客户端请求数据未到达
客户端请求数据未到达
客户端请求数据未到达
客户端请求数据未到达
hello server
客户端请求连接未到达
客户端请求连接未到达
客户端请求连接未到达
客户端请求连接未到达
客户端请求连接未到达
//客户端
数据已发送
非阻塞IO 和 Linux 非阻塞I/O模型 向对应
image2 非阻塞IO + Selector
Selector 一般称为选择器,用来作为SelectableChannel通道的多路复用器。SelectableChannel类型通道可以被注册到多路复用器,通过多路复用器监听感兴趣的事件,这样就可以通过Selector实现单个线程可以管理多个SelectableChannel通道,从而管理多个网络连接。
image选择器对应 I/O多路复模型
image@Test
public void test_unBlock_selector_server() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
/** 创建服务器套接字通道 ServerSocketChannel **/
ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
/** 绑定监听 InetSocketAddress **/
serverSocketChannel1.bind(new InetSocketAddress("localhost", 7777));
/** 设置为非阻塞IO模型 **/
serverSocketChannel1.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT,并返回SelectionKey**/
serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
boolean is_Run = true;
while (is_Run) {
/** 阻塞等待事件到达**/
selector.select();
/** 获取到达事件SelectionKey集合**/
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
/** 遍历SelectionKey**/
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
/** 将获取SelectionKey 从publicSelectedKeys集合中删除,防止重复处理**/
iterator.remove();
/** 判断OP_ACCEPT事件是否到达 **/
if (key.isAcceptable()) {
/** 从SelectionKey获取对应通道ServerSocketChannel**/
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
/** 获取SocketChannel**/
SocketChannel socketChannel = serverSocketChannel.accept();
/** 设置为非阻塞IO模型 **/
socketChannel.configureBlocking(false);
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_READ(当客户端请求数据时事件到达) **/
socketChannel.register(selector, SelectionKey.OP_READ);
/** 向客户端发送消息 **/
socketChannel.write(ByteBuffer.wrap("hello client".getBytes()));
}
/** 判断OP_READ事件是否到达 **/
else if (key.isReadable()) {
/** 从SelectionKey获取对应通道SocketChannel**/
SocketChannel socketChannel = (SocketChannel) key.channel();
/** 读取客户端发送数据 **/
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
socketChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array()));
}
}
}
}
@Test
public void test_unBlock_selector_client() throws Exception {
/** 实例化一个选择器对象 **/
Selector selector = Selector.open();
/** 创建套接字通道 SocketChannel **/
SocketChannel socketChannel = SocketChannel.open();
/** 设置为非阻塞IO模型 **/
socketChannel.configureBlocking(false);
/** 发起连接 **/
socketChannel.connect(new InetSocketAddress("localhost", 7777));
/** 将socketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_CONNECT,并返回SelectionKey**/
socketChannel.register(selector, SelectionKey.OP_CONNECT);
boolean is_Run = true;
while (is_Run) {
/** 阻塞等待事件到达**/
selector.select();
/** 获取到达事件SelectionKey集合**/
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
/** 遍历SelectionKey**/
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
/** 将获取SelectionKey 从publicSelectedKeys集合中删除,防止重复处理**/
iterator.remove();
/** 判断OP_CONNECT事件是否到达 **/
if (key.isConnectable()) {
/** 从SelectionKey获取对应通道socketChannel**/
socketChannel = (SocketChannel) key.channel();
while (!socketChannel.finishConnect()){
}
/** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_READ(当客户端请求数据时事件到达被添加到selectedKeys集合中) **/
socketChannel.register(selector, SelectionKey.OP_READ);
/** 向服务端发送消息 **/
socketChannel.write(ByteBuffer.wrap("hello server".getBytes()));
}
/** 判断OP_READ事件是否到达 **/
else if (key.isReadable()) {
/** 从SelectionKey获取对应通道socketChannel**/
socketChannel = (SocketChannel) key.channel();
/**读取服务端发送消息 **/
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
int read = socketChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array()));
}
}
}
}
阻塞IO VS 非阻塞IO+ Selector
从流程上来看,IO多路复用需要将通道注册到选择器上,通过选择器监听事件。用户通过select()阻塞当前线程等待事件到达。当事件到达后调用对应函数处理,如读取事件到达调用read()将数据从内核缓冲区拷贝到用户缓冲区。相对于阻塞IO来说进行了两个函数调用。同时也阻塞效率更差。
但是这个选择器作用在于可以用一个线程监听多个socket连接。只有当建立连接的socket发出请求时才会创建一个线程来处理。对于不活跃的连接并不会创建一个线程来处理。这样大大提供了CPU使用率。解决阻塞IO中单连接单线程1:1的问题。
服务端好比一个餐厅。刚开始时餐厅只有老板一个人作为服务员。每当来一个客户进入餐厅老板都会亲自为客户服务需要自己处理,直到客户离开。由于老板服务很周到,餐厅的客户越来越多,老板不得不开始雇佣服务员为更多客户同时服务。而这种方式在高峰时段老板不得 不雇佣超过成本上线得员工。老板为了控制成本,决定只雇佣一定数量员工。如果员工数量忙不过来只能让客户等待。时间长了老板发现起始很多客户大多数时间只有在点餐和加餐时候需要员工服务。大多数时候这种1对1得模式员工只是等待客户提需求并没有任何工作。因此老板购买了一个点餐系统。但客户进餐厅时,会给客户分配一个按铃。当客户需要点餐时。就可以点击这时会指派一名员工为客户服务。
网友评论