day20
复习
NIO
1.Buffer --> ByteBuffer
构造方法:
public static allocate(int 字节数);
public static allocatDirect(int 字节数);
public static wrap(byte[] bs);
成员方法:
put(byte b/byte[] bs/byte[] bs,int startIndex,int len); //向缓冲区添加数据
position([int newPosition]);//获取或者修改当前索引
limit([int newLimit]);//获取或者修改当前限制
capacity(); //获取缓冲区的容量
clear();//清空缓冲区
flip();//切换模式
mark(); //记录当前的postion
reset();//将当前的position更改到mark的位置
...
2.Chanel
FileChinnel 文件通道
SocketChannel 客户端通道
SocketChannel socketChannel = SocketChannel.open();
//默认是阻塞式连接
//可以通过以下代码修改
socketChannel.configureBlocking(false);//修改为非阻塞连接
socketChannel.connect(new InetSockerAddress("服务器IP",服务器端口号));
ServerSocketChannel 服务器通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(服务器自己运行的端口号));
//默认也是阻塞的
//可以通过以下代码修改
serverSocketChannel.configureBlocking(false);//修改为非阻塞连接
SocketChannel socketChannel = serverSocketChannel.accept(); //接收客户端通道
今日内容
- NIO的Buffer,Channel,Selector(选择器,多路复用器)
- AIO是异步非阻塞IO
Select(选择器,多路复用器)
多路复用的概念
多路: 多个服务器分别去监听多个端口!
如果多路不复用,每个服务器都需要开启一个线程(在高并发下性能较差)
如果多路复用,可以把多个服务器注册到一个Selector(选择器)上,只需要开启一个线程即可处理这些服务器(在高并发性能较高)
选择器Selector
-
什么是Selector
Selector称为选择器,也叫多路复用器,他可以让多个Channel注册给他,然后监听各个Channel发生的事件
-
Selector的创建API
创建Selector的方式
Selector selector = Selector.open();
-
Channel注册到Selector的API
b.将我们要交给选择器管理的通道注册到选择器里
channel是一个通道,并且必须是非阻塞的通道
SelectionKey key = channel.register(selector,SelectionKey.OP_ACCEPT);
参数1: 该通道注册到的选择器对象
参数2: 表示我们选择器对何种事件感兴趣(服务器通道只能写SelectionKey.OP_ACCEPT,表示有客户端连接)返回值: SelectionKey是对象,该对象中包含注册到选择器的通道
-
代码演示
public class SelectorDemo01 { public static void main(String[] args) throws IOException { //1.多路(多个服务器监听多个端口) ServerSocketChannel server1 = ServerSocketChannel.open(); server1.configureBlocking(false); //一定要改为非阻塞 server1.bind(new InetSocketAddress(7777)); ServerSocketChannel server2 = ServerSocketChannel.open(); server2.configureBlocking(false); //一定要改为非阻塞 server2.bind(new InetSocketAddress(8888)); ServerSocketChannel server3 = ServerSocketChannel.open(); server3.configureBlocking(false); //一定要改为非阻塞 server3.bind(new InetSocketAddress(9999)); //2.获取Selector选择器对象 Selector selector = Selector.open(); //3.将多个server注册到同一个Selector上 se,那么rver1.register(selector, SelectionKey.OP_ACCEPT); server2.register(selector, SelectionKey.OP_ACCEPT); server3.register(selector, SelectionKey.OP_ACCEPT); } }
Selector中的常用方法
-
Selector的
keys()
方法此方法返回一个Set集合,表示:已注册通道的集合。每个已注册通道封装为一个SelectionKey对象。表示返回的是所有注册到选择器的通道集合(这时的通道对象Channel是已经封装成SelectionKey)
-
Selector的
selectedKeys()
方法此方法返回一个Set集合,表示:当前已连接的所有通道的集合。每个已连接通道同一封装为一个SelectionKey对象。需要循环, 否则可能会出现, 刚连接上一个就返回了连接的通道集合. SelectionKye类就是Channel通道封装之后的类
-
Selector的
select()
方法- 此方法会阻塞,直到有至少1个客户端连接(只要有一个客户端连接之后便不会再阻塞)。
- 此方法会返回一个int值(返回的是本次连接数目, 有可能是刚有一个连接就返回了),表示有几个客户端连接了服务器。
Selector实现多路连接(上)
public class SelectorDemo02 {
public static void main(String[] args) throws IOException {
//1.多路(多个服务器监听多个端口)
ServerSocketChannel server1 = ServerSocketChannel.open();
server1.configureBlocking(false); //一定要改为非阻塞
server1.bind(new InetSocketAddress(7777));
ServerSocketChannel server2 = ServerSocketChannel.open();
server2.configureBlocking(false); //一定要改为非阻塞
server2.bind(new InetSocketAddress(8888));
ServerSocketChannel server3 = ServerSocketChannel.open();
server3.configureBlocking(false); //一定要改为非阻塞
server3.bind(new InetSocketAddress(9999));
//2.获取Selector选择器对象
Selector selector = Selector.open();
//3.将多个server注册到同一个Selector上
server1.register(selector, SelectionKey.OP_ACCEPT);
server2.register(selector, SelectionKey.OP_ACCEPT);
server3.register(selector, SelectionKey.OP_ACCEPT);
//4.接收客户端连接
Set<SelectionKey> keys = selector.keys();//获取已注册通道的集合
System.out.println("注册通道数量:" + keys.size());
Set<SelectionKey> selectionKeys = selector.selectedKeys();//获取已连接通 道的集合
System.out.println("已连接通道数量:" + selectionKeys.size());
System.out.println("----------------------------------------------");
System.out.println("【服务器】等待连接......");
int selectedCount = selector.select();//此方法会"阻塞"
System.out.println("本次连接数量:" + selectedCount);
System.out.println("----------------------------------------------");
Set<SelectionKey> keys1 = selector.keys();
System.out.println("注册通道数量:" + keys1.size());
Set<SelectionKey> selectionKeys1 = selector.selectedKeys();
System.out.println("已连接的通道数量:" + selectionKeys1.size());
}
}
public class SocketChannelDemo {
public static void main(String[] args) {
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("7777客户端连接服务器......");
socket.connect(new InetSocketAddress("127.0.0.1", 7777));
System.out.println("7777客户端连接成功....");
} catch (IOException e) {
System.out.println("7777异常重连");
}
}).start();
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("8888客户端连接服务器......");
socket.connect(new InetSocketAddress("127.0.0.1", 8888));
System.out.println("8888客户端连接成功....");
} catch (IOException e) {
System.out.println("8888异常重连");
}
}).start();
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("9999客户端连接服务器......");
socket.connect(new InetSocketAddress("127.0.0.1", 9999));
System.out.println("9999客户端连接成功....");
} catch (IOException e) {
System.out.println("9999异常重连");
}
}).start();
}
}
Selector实现多路连接(下)
public class SelectorDemo02 {
public static void main(String[] args) throws IOException, InterruptedException {
//1.多路(多个服务器监听多个端口)
ServerSocketChannel server1 = ServerSocketChannel.open();
server1.configureBlocking(false); //一定要改为非阻塞
server1.bind(new InetSocketAddress(7777));
ServerSocketChannel server2 = ServerSocketChannel.open();
server2.configureBlocking(false); //一定要改为非阻塞
server2.bind(new InetSocketAddress(8888));
ServerSocketChannel server3 = ServerSocketChannel.open();
server3.configureBlocking(false); //一定要改为非阻塞
server3.bind(new InetSocketAddress(9999));
//2.获取Selector选择器对象
Selector selector = Selector.open();
//3.将多个server注册到同一个Selector上
server1.register(selector, SelectionKey.OP_ACCEPT);
server2.register(selector, SelectionKey.OP_ACCEPT);
server3.register(selector, SelectionKey.OP_ACCEPT);
//4.接收客户端连接
Set<SelectionKey> keys = selector.keys();//获取已注册通道的集合
System.out.println("注册通道数量:" + keys.size());
Set<SelectionKey> selectionKeys = selector.selectedKeys();//获取已连接通 道的集合
System.out.println("已连接通道数量:" + selectionKeys.size());
System.out.println("----------------------------------------------");
while (true) {
System.out.println("【服务器】等待连接......");
int selectedCount = selector.select();//此方法会"阻塞"
System.out.println("本次连接数量:" + selectedCount);
System.out.println("----------------------------------------------");
Set<SelectionKey> keys1 = selector.keys();
System.out.println("注册通道数量:" + keys1.size());
Set<SelectionKey> selectionKeys1 = selector.selectedKeys();
System.out.println("已连接的通道数量:" + selectionKeys1.size());
System.out.println("============处理被连接的服务器通道============");
//遍历已连接通道的集合
Iterator<SelectionKey> it = selectionKeys1.iterator();
while (it.hasNext()) {
//获取当前连接通道的
SelectionKey key = it.next();
//从SelectionKey中获取通道对象
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
//看一下此通道是监听哪个端口的
System.out.println("监听端口:" + channel.getLocalAddress());
//取出连接到该服务器的客户端通道
SocketChannel accept = channel.accept();
System.out.println(accept);
System.out.println("写与该客户端进行交互的代码....");
//从连接的通道中把已经处理过的服务器通道移出
it.remove();
}
System.out.println("休息5秒......");
Thread.sleep(5000);
System.out.println();//打印一个空行
}
}
}
public class SocketChannelDemo {
public static void main(String[] args) {
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("7777客户端连接服务器......");
socket.connect(new InetSocketAddress("127.0.0.1", 7777));
System.out.println("7777客户端连接成功....");
} catch (IOException e) {
System.out.println("7777异常重连");
}
}).start();
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("8888客户端连接服务器......");
socket.connect(new InetSocketAddress("127.0.0.1", 8888));
System.out.println("8888客户端连接成功....");
} catch (IOException e) {
System.out.println("8888异常重连");
}
}).start();
new Thread(() -> {
try (SocketChannel socket = SocketChannel.open()) {
System.out.println("9999客户端连接服务器......");
socket.connect(new InetSocketAddress("127.0.0.1", 9999));
System.out.println("9999客户端连接成功....");
} catch (IOException e) {
System.out.println("9999异常重连");
}
}).start();
}
}
Selector多路信息接收测试
public class SelectorDemo02 {
public static void main(String[] args) throws IOException, InterruptedException {
//1.多路(多个服务器监听多个端口)
ServerSocketChannel server1 = ServerSocketChannel.open();
server1.configureBlocking(false); //一定要改为非阻塞
server1.bind(new InetSocketAddress(7777));
ServerSocketChannel server2 = ServerSocketChannel.open();
server2.configureBlocking(false); //一定要改为非阻塞
server2.bind(new InetSocketAddress(8888));
ServerSocketChannel server3 = ServerSocketChannel.open();
server3.configureBlocking(false); //一定要改为非阻塞
server3.bind(new InetSocketAddress(9999));
//2.获取Selector选择器对象
Selector selector = Selector.open();
//3.将多个server注册到同一个Selector上
server1.register(selector, SelectionKey.OP_ACCEPT);
server2.register(selector, SelectionKey.OP_ACCEPT);
server3.register(selector, SelectionKey.OP_ACCEPT);
//4.接收客户端连接
Set<SelectionKey> keys = selector.keys();//获取已注册通道的集合
System.out.println("注册通道数量:" + keys.size());
Set<SelectionKey> selectionKeys = selector.selectedKeys();//获取已连接通 道的集合
System.out.println("已连接通道数量:" + selectionKeys.size());
System.out.println("----------------------------------------------");
while (true) {
System.out.println("【服务器】等待连接......");
int selectedCount = selector.select();//此方法会"阻塞"
System.out.println("本次连接数量:" + selectedCount);
System.out.println("----------------------------------------------");
Set<SelectionKey> keys1 = selector.keys();
System.out.println("注册通道数量:" + keys1.size());
Set<SelectionKey> selectionKeys1 = selector.selectedKeys();
System.out.println("已连接的通道数量:" + selectionKeys1.size());
System.out.println("============处理被连接的服务器通道============");
//遍历已连接通道的集合
Iterator<SelectionKey> it = selectionKeys1.iterator();
while (it.hasNext()) {
//获取当前连接通道的
SelectionKey key = it.next();
//从SelectionKey中获取通道对象
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
//看一下此通道是监听哪个端口的
System.out.println("监听端口:" + channel.getLocalAddress());
//取出连接到该服务器的客户端通道
SocketChannel accept = channel.accept();
System.out.println("写与该客户端进行交互的代码....");
//接收客户端发送过来的信息
ByteBuffer inBuf = ByteBuffer.allocate(1000);
accept.read(inBuf);
inBuf.flip();
String msg = new String(inBuf.array(), 0, inBuf.limit());
System.out.println("【服务器】接收到通道【" + channel.getLocalAddress() + "】的信息:" + msg);
//从连接的通道中把已经处理过的服务器通道移出
it.remove();
}
System.out.println("休息5秒......");
Thread.sleep(5000);
System.out.println();//打印一个空行
}
}
}
AIO(异步,非阻塞)
AIO概述和分类
-
AIO: ASynchronized 异步非阻塞的IO
-
AIO的分类:
-
异步的客户端通道 AsynchronousSocketChannel
-
异步的服务器通道 AsynchronousServerSocketChannel
-
异步的文件通道 AsynchronousFileChannel
-
异步的UDP协议通道 AsynchronousDatagramChannel
-
-
AIO的异步:
- 表现在两个方面:
- 连接时: 可以使用异步(调用连接的方法时, 非阻塞, 连接成功之后会以方法回调的机制通知我们)
- 读取数据时: 可以使用异步(调用read方法时, 非阻塞, 等数据接收到之后以方法调用的机制通知我们)
- 表现在两个方面:
AIO的异步非阻塞连接的建立
- 异步非阻塞的客户端通道建立
/**
* AIO下的 异步客户端通道
*/
public class AIOSocketChannelDemo01 {
public static void main(String[] args) throws IOException {
//1.创建 异步的客户端通道
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
//2.去连接服务器,采用异步非阻塞方法
//connect(服务器的IP和端口号,附件(null),接口);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888), null, new CompletionHandler<Void, Object>() {
@Override
public void completed(Void result, Object attachment) {
System.out.println("连接服务器成功...");
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接服务器失败...");
}
});
System.out.println("程序继续执行....");
while (true) {
}
}
}
- 异步非阻塞的服务器通道建立
/**
* AIO下的 异步服务器端通道
*/
public class AIOServerSocketChannelDemo01 {
public static void main(String[] args) throws IOException, InterruptedException {
//1.创建一个异步的服务器通道
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
//2.绑定本地某个端口
serverSocketChannel.bind(new InetSocketAddress(8888));
//3.接收异步客户端,采用异步非阻塞方式
//accpet(附件(nulll),接口);
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println("有客户端连接....");
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("客户端连接失败...");
}
});
System.out.println("程序继续执行..");
while (true) {
}
}
}
- 异步非阻塞建立连接
见AIO异步客户端和AIO异步服务器端建立
AIO同步读写数据
/**
* AIO下的 异步客户端通道
*/
public class AIOSocketChannelDemo01 {
public static void main(String[] args) throws IOException, InterruptedException {
//1.创建 异步的客户端通道
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
//2.去连接服务器,采用异步非阻塞方法
//connect(服务器的IP和端口号,附件(null),接口);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888), null, new CompletionHandler<Void, Object>() {
@Override
public void completed(Void result, Object attachment) {
System.out.println("连接服务器成功...");
//客户端给服务器发送数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("你好我是AIO客户端...".getBytes());
//切换读写模式
buffer.flip();
socketChannel.write(buffer);
//释放资源
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接服务器失败...");
}
});
System.out.println("程序继续执行....");
}
}
/**
* AIO下的 异步服务器端通道
*/
public class AIOServerSocketChannelDemo01 {
public static void main(String[] args) throws IOException, InterruptedException {
//1.创建一个异步的服务器通道
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
//2.绑定本地某个端口
serverSocketChannel.bind(new InetSocketAddress(8888));
//3.接收异步客户端,采用异步非阻塞方式
//accpet(附件(nulll),接口);
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
System.out.println("有客户端连接....");
//去读客户端发送来的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
Future<Integer> future = socketChannel.read(buffer);
//
try {
byte[] array = buffer.array();
System.out.println(Arrays.toString(array));
Integer len = future.get();
System.out.println(len);
//当调用future的get方法时,数据才写入到buffer中
//所以我们不能在调用get方法之前,调用flip,否则数据将无法写入
buffer.flip();
System.out.println(new String(array,0,len));
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("客户端连接失败...");
}
});
System.out.println("程序继续执行..");
}
}
AIO异步读写数据
/**
* AIO下的 异步客户端通道
*/
public class AIOSocketChannelDemo01 {
public static void main(String[] args) throws IOException, InterruptedException {
//1.创建 异步的客户端通道
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
//2.去连接服务器,采用异步非阻塞方法
//connect(服务器的connectIP和端口号,附件(null),接口);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8888), null, new CompletionHandler<Void, Object>() {
@Override
public void completed(Void result, Object attachment) {
System.out.println("连接服务器成功...");
//给服务器发送数据
ByteBuffer buffer = ByteBuffer.allocate(1000);
buffer.put("你好我是AIO客户端..".getBytes());
buffer.flip();
//异步的write(缓冲区,超时时间,时间单位,附件(null),回调接口);
socketChannel.write(buffer, 10, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
System.out.println("数据成功发送...");
//释放资源
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("数据发送失败...");
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接服务器失败...");
}
});
System.out.println("程序继续执行....");
Thread.sleep(5000);
}
}
/**
* AIO下的 异步服务器端通道
*/
public class AIOServerSocketChannelDemo01 {
public static void main(String[] args) throws IOException, InterruptedException {
//1.创建一个异步的服务器通道
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
//2.绑定本地某个端口
serverSocketChannel.bind(new InetSocketAddress(8888));
//3.接收异步客户端,采用异步非阻塞方式
//accpet(附件(nulll),接口);
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
System.out.println("有客户端连接....");
//从客户端中读取数据
//异步的read(字节缓冲区,超时时间,时间单位,附件(null),回调接口)
ByteBuffer buffer = ByteBuffer.allocate(1000);
socketChannel.read(buffer, 10, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result, Object attachment) {
System.out.println("数据读取完毕..");
System.out.println("接收到数据的长度:"+result);
System.out.println("接收到的数据是:" + new String(buffer.array(), 0, result));
//释放资源
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("读取数据失败...");
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("客户端连接失败...");
}
});
System.out.println("程序继续执行..");
while (true) {
Thread.sleep(1000);
}
}
}
今日总结
能够说出Selector选择器的作用
Selector可以让多个服务器注册到它上,完成多路复用的功能!!
能够使用Selector选择器
注册:
通道.register(selector,SelectionKey.OP_ACCEPT);
方法:
Set<SelectionKey> keys = selector.selectedKeys(); //获取被连接的服务器通道的集合
Set<SelectionKey> keys = selector.keys();//获取所有注册到选择器的服务器通道集合
int count = selector.select();//获取本次被客户端通道连接的服务器通道的数量
能够说出AIO的特点
特点:
a.支持异步非阻塞连接 connect accept
b.支持异步非阻塞的读写数据
socketChannel.write(....);
socketChannel.read(....);
今日没有作业:
a.前面知识学得不好的,复习前面的内容(Map遍历,斗地主案例,学生管理系统,TCP双向通信)
b.前面知识嗷嗷的,没问题,你可以研究NIO和AIO
网友评论