一、阻塞与非阻塞
1.1 阻塞
1.1.1 阻塞模式会存在哪些问题?
1)在阻塞模式下,以下的方法都会导致线程暂停
- ServerSocketChannel.accept 会在没有连接建立时让线程暂停
- SocketChannel.read 会在没有数据可读时让线程暂停
- 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程处于闲置状态
2)单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持
3)多线程下,有新的问题,体现在以下方面
-
32 位 jvm 一个线程最大堆栈是 320k,64 位 jvm 一个线程 最大堆栈是1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低。
-
可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive(不活跃),会阻塞线程池中所有线程,因此不适合长连接,只适合短连接
1.1.2 测试代码:
服务端代码:
public class BioServerTest {
public static void main(String[] args) throws IOException {
// 使用 nio 来理解阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
System.out.println("connecting...");
// 阻塞方法,线程停止运行
SocketChannel sc = ssc.accept();
System.out.println("connected... " + sc);
channels.add(sc);
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
System.out.println("before read..."+channel);
try {
// 阻塞方法,线程停止运行
channel.read(buffer);
} catch (IOException e) {
}
buffer.flip();
System.out.println(print(buffer));
buffer.clear();
System.out.println("after read..."+channel);
}
}
}
static String print(ByteBuffer b) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < b.limit(); i++) {
stringBuilder.append((char) b.get(i));
}
return stringBuilder.toString();
}
}
客户端代码:
public class SocketClientTest{
public static void main(String[] args) throws Exception {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
System.out.println("waiting...");
//模拟长连接一直存在
while (true) {
}
}
}
启动服务端看结果,一直在connecting,此时线程阻塞了:
connecting...
启动客户端看结果,此时连接成功,又阻塞到收到消息之前:
connecting...
connected... java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:64800]
before read...java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:64800]
1.2 非阻塞
1.2.1 相比阻塞改变了什么?
非阻塞模式下,相关方法都不会让线程暂停
- 在 ServerSocketChannel.accept 在没有连接建立时,会返回 null,继续运行
- SocketChannel.read 在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept
- 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去
1.2.2 非阻塞模型存在哪些问题?
1)但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了 cpu
2)数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)
1.2.3 测试代码
服务端代码:
public class NonIoServerTest {
public static void main(String[] args) throws IOException {
// 使用 nio 来理解非阻塞模式, 单线程
// 0. ByteBuffer
ByteBuffer buffer = ByteBuffer.allocate(16);
// 1. 创建了服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
// 非阻塞模式
ssc.configureBlocking(false);
// 2. 绑定监听端口
ssc.bind(new InetSocketAddress(8080));
// 3. 连接集合
List<SocketChannel> channels = new ArrayList<>();
while (true) {
// 4. accept 建立与客户端连接, SocketChannel 用来与客户端之间通信
// 非阻塞,线程还会继续运行,如果没有连接建立,但sc是null
SocketChannel sc = ssc.accept();
if (sc != null) {
System.out.println("connected... " + sc);
// 非阻塞模式
sc.configureBlocking(false);
channels.add(sc);
}
for (SocketChannel channel : channels) {
// 5. 接收客户端发送的数据
// 非阻塞,线程仍然会继续运行,如果没有读到数据,read 返回 0
int read = channel.read(buffer);
if (read > 0) {
buffer.flip();
System.out.println(print(buffer));
buffer.clear();
System.out.println("after read..."+channel);
}
}
// 用于查非阻塞状态,连接客户端时可关闭,便于观看结果
System.out.println("wait connecting...");
}
}
static String print(ByteBuffer b) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < b.limit(); i++) {
stringBuilder.append((char) b.get(i));
}
return stringBuilder.toString();
}
}
启动服务端结果如下,不断刷新:
wait connecting...
wait connecting...
wait connecting...
wait connecting...
wait connecting...
wait connecting...
wait connecting...
wait connecting...
wait connecting...
... ...
服务端与前面测试阻塞时一样,我们将服务端的System.out.println("wait connecting...");这行代码注释掉,方便看结果。
启动客户端,看服务端结果:
connected... java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:61254]
二、多路复用
单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用。
-
多路复用仅针对网络 IO,普通文件 IO 没法利用多路复用
-
如果使用非阻塞模式,而不使用selector,则线程大部分时间都在做无用功,使用Selector 能够保证以下三点:
- 有可连接事件时才去连接
- 有可读事件才去读取
- 有可写事件才去写入( 限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件)
2.1 Selector
image.png上述方案的好处:
- 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功。
- 让这个线程能够被充分利用
- 节约了线程的数量
- 减少了线程上下文切换
2.1.1 如何使用Selector?
如下代码及注释描述:
public class SelectorTest {
public static void main(String[] args) throws IOException {
// 创建Selector
Selector selector = Selector.open();
// 绑定channel事件(SelectionKey当中有四种事件:OP_ACCEPT,OP_CONNECT, OP_READ, OP_WRITE)
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_ACCEPT);
// 监听channel事件,返回值是发生事件的channel数
// 监听方法1,阻塞直到绑定事件发生
int count1 = selector.select();
// 监听方法2,阻塞直到绑定事件发生,或是超时(时间单位为 ms)
int count2 = selector.select(1000);
// 监听方法3,不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件
int count3 = selector.selectNow();
}
}
上述代码当中,在selector进行channel时间监听时,会发生阻塞,直到时间发生,那么有哪些情况会使线程变成不阻塞状态呢?如下所示:
1)事件发生时(SelectionKey当中有四种事件:OP_ACCEPT,OP_CONNECT, OP_READ, OP_WRITE)
- 客户端发起连接请求,会触发 accept 事件
- 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件
- channel 可写,会触发 write 事件
- 在 linux 下 nio bug 发生时
2)调用 selector.wakeup()
3)调用 selector.close()
4)selector 所在线程 interrupt
2.2 处理accept事件
服务端如下所示:
public class AcceptEventServerTest {
public static void main(String[] args) {
try {
ServerSocketChannel channel = ServerSocketChannel.open();
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
// int count = selector.selectNow();
System.out.println("select count: " + count);
// if(count <= 0) {
// continue;
// }
// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();
// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
System.out.println(sc);
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端如下:
public class ClientTest {
public static void main(String[] args) {
// accept事件
try (Socket socket = new Socket("localhost", 8080)) {
System.out.println(socket);
// read事件
socket.getOutputStream().write("world".getBytes());
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
服务端打印结果:
sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080]
select count: 1
java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:60469]
上述代码中服务端注释掉了使用selector.selectNow()的方法,如果使用该方法,需要自己去判断返回值是否为0。
事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发。
2.3 处理read事件
此处仍然使用代码的方式讲解,客户端与前面的客户端相同,只是此处会同时启动两个客户端,其中发送的内容分别是“hello” 和 “world”。
服务端代码如下所示:
public class ReadEventServerTest {
public static void main(String[] args) {
try {
ServerSocketChannel channel = ServerSocketChannel.open();
channel.bind(new InetSocketAddress(8080));
System.out.println(channel);
Selector selector = Selector.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
System.out.println("select count:" + count);
// 获取所有事件
Set<SelectionKey> keys = selector.selectedKeys();
// 遍历所有事件,逐一处理
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 判断事件类型
if (key.isAcceptable()) {
ServerSocketChannel c = (ServerSocketChannel) key.channel();
// 必须处理
SocketChannel sc = c.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
System.out.println("连接已建立:" + sc);
} else if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
int read = sc.read(buffer);
if (read == -1) {
key.cancel();
sc.close();
} else {
buffer.flip();
System.out.println(print(buffer));
}
}
// 处理完毕,必须将事件移除
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
static String print(ByteBuffer b) {
StringBuilder stringBuilder = new StringBuilder();
for (int i = 0; i < b.limit(); i++) {
stringBuilder.append((char) b.get(i));
}
return stringBuilder.toString();
}
}
启动服务端,并先后启动两个客户端,看结果,首先服务端channel自己注册到selector,客户端1发送accept事件,服务端接收到后,继续while循环,监听到read事件,打印内容为“hello”,客户端2步骤相同。
sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:8080]
select count:1
连接已建立:java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:61693]
select count:1
hello
select count:1
连接已建立:java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:51495]
select count:1
world
注意:最后的iter.remove(),为什么要移除?
因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如
- 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
- 第二次触发了 ssckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常
上述代码中cancel有什么作用?
cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件
2.3.1 关注消息边界
首先看如下的代码是否有问题,客户端代码如下:
public class ServerTest {
public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(9000);
while (true) {
Socket s = ss.accept();
InputStream in = s.getInputStream();
// 每次读取4个字节
byte[] arr = new byte[4];
while (true) {
int read = in.read(arr);
// 读取长度是-1,则不读取了
if (read == -1) {
break;
}
System.out.println(new String(arr, 0, read));
}
}
}
}
服务端代码如下:
public class ClientTest {
public static void main(String[] args) throws IOException {
Socket max = new Socket("localhost", 9000);
OutputStream out = max.getOutputStream();
out.write("hello".getBytes());
out.write("world".getBytes());
out.write("你好".getBytes());
out.write("世界".getBytes());
max.close();
}
}
结果:
hell
owor
ld�
�好
世�
��
为什么会产生上述的问题?
这里面涉及到消息边界的问题。消息的长短是不同的,当我们指定相同长度的ByteBuffer去接收消息时,必然存在不同时间段存在很多种情况,如下所示:
由于buffer长度固定,必然存在消息被截断的情况,那么如何解决这些问题呢?
1)一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
2)另一种思路是按分隔符拆分,缺点是效率低
3)TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
- Http 1.1 是 TLV 格式
- Http 2.0 是 LTV 格式
通过面给出的答案都不是最好的解决方案,重点的问题在于如何分配Bytebuffer的大小?
buffer是给一个channel独立使用的,不能被多个channel共同使用,因为存在粘包、半包的问题。
buffer的大小又不能太大,如果要支持很大的连接数,同时又设置很大的buffer,则必然需要庞大的内存。
所以我们需要设置一个大小可变的ByteBuffer。
目前有两种较为简单实现方案,其都有其优缺点:
1)预先分配一个较小的buffer,例如4k,如果发现不能装下全部内容,则创建一个更大的buffer,比如8k,将已写入的4k拷贝到新分配的8kbuffer,将剩下的内容继续写入。
其优点是消息必然是连续的,但是不断的分配和拷贝,必然会对性能造成较大的影响。
2)使用多个数组的形式组成buffer,当一个数组存不下数据内容,就将剩余数据放入下一个数组当中。在netty中的CompositeByteBuf类,就是这种方式。
其缺点是数据不连续,需要再次解析整合,优点是解决了上一个方案的造成性能损耗的问题。
2.4 处理write事件
什么是两阶段策略?
其出现原因有如下两个:
1)在非阻塞模式下,我们无法保证将buffer中的所有数据全部写入channel当中,所以我们需要追踪写入后的返回值,也就是实际写入字节的数值。
int write = channel.write(buffer);
2)我们可以使所有的selector监听channel的可写事件,每个channel都会有一个key用来跟踪buffer,这样会占用过多的内存。(关于此点不太理解的,下面可以通过代码来理解)
鉴于以上问题,出现的两阶段策略:
1)当第一次写入消息时,我们才将channel注册到selector
2)如果第一次没写完,再次添加写事件, 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册(不取消则每次都会出现写事件)。
下面通过代码的方式演示:
服务端:
public class ServerTest {
public static void main(String[] args) throws IOException {
// 开启一个服务channel
ServerSocketChannel ssc = ServerSocketChannel.open();
// 设置非阻塞
ssc.configureBlocking(false);
// 绑定端口
ssc.bind(new InetSocketAddress(8080));
//初始化selector
Selector selector = Selector.open();
//注册服务端到selector
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
//监听事件,此处会阻塞
selector.select();
//获取所有事件key,遍历
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
//不管处理成功与否,移除key
iter.remove();
//如果是建立连接事件
if (key.isAcceptable()) {
//处理accept事件
SocketChannel sc = ssc.accept();
//设置非阻塞
sc.configureBlocking(false);
// 此处是第一阶段
//注册一个read事件到selector
SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
// 1. 向客户端发送内容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 30000000; i++) {
sb.append("a");
}
//字符串转buffer
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
//2. 写入数据到客户端channel
int write = sc.write(buffer);
// 3. write 表示实际写了多少字节
System.out.println("实际写入字节:" + write);
// 4. 如果有剩余未读字节,才需要关注写事件
// 此处是第二阶段
if (buffer.hasRemaining()) {
// 在原有关注事件的基础上,多关注 写事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// 把 buffer 作为附件加入 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) {
// 检索key中的附件buffer
ByteBuffer buffer = (ByteBuffer) key.attachment();
// 获取客户端channel
SocketChannel sc = (SocketChannel) key.channel();
// 根据上次的position继续写
int write = sc.write(buffer);
System.out.println("实际写入字节:" + write);
// 如果写完了,需要将绑定的附件buffer去掉,并且去掉写事件
// 如果没写完将会继续while,执行写事件,知道完成为止
if (!buffer.hasRemaining()) {
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
}
}
}
}
客户端:
public class ClientTest {
public static void main(String[] args) throws IOException {
// 开启selector
Selector selector = Selector.open();
// 开启客户端channel
SocketChannel sc = SocketChannel.open();
//设置非阻塞
sc.configureBlocking(false);
// 注册一个连接事件和读事件
sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
//连接到服务端
sc.connect(new InetSocketAddress("localhost", 8080));
int count = 0;
while (true) {
//此处监测事件,阻塞
selector.select();
//获取时间key集合,并遍历
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
//无论成功与否,都要移除
iter.remove();
//连接
if (key.isConnectable()) {
System.out.println(sc.finishConnect());
} else if (key.isReadable()) {
//分配内存buffer
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
//读数据到buffer
count += sc.read(buffer);
//清空buffer
buffer.clear();
//打印总字节数
System.out.println(count);
}
}
}
}
}
分别启动服务端和客户端,结果如下:
实际写入字节:3801059
实际写入字节:3014633
实际写入字节:4063201
实际写入字节:4718556
实际写入字节:2490349
实际写入字节:2621420
实际写入字节:2621420
实际写入字节:2621420
实际写入字节:2621420
实际写入字节:1426522
true
131071
262142
393213
524284
655355
... ...
29753117
29884188
30000000
兄弟们,看到这了就给个赞呗,感谢
网友评论