一、自己理解的概念
nioSocket(即new io socket)是一种同步非阻塞的I/O,其使用buffer来缓存数据和channel来传输数据,使用select来分拣消息。其使用的ServerSocketChannel和SocketChannel对应于之前学习的ServerSocket和Socket。
在我看的书(《看透Spring MVC源代码分析与实践》)中形容为:当今的快递物流,现实中的快递站点不会一件一件地发出快递,而是在每一天的中午或者是一天的下午等时间即等待一段时间后将受到的快件一起发出去,这就像这里要提到的NioSocket的工作原理,送货员就像这里的channel,将一段时间的一批货一起送出去,这一批快件就针对于buffer,到一个中转点分拣员分拣,将不同的货分发给相应的地区相应的快递员,分拣员的身份就相当于selector。
二、与普通socket的区别:
- 数据传输方面:
socket是直接使用输入输出流的方式直接读,当然也可以选择性的放在缓冲数据区中。
nioSocket只能用buffer来进行信息的获取与发出! - 异步特性上面
socket 在连接和读写是都是堵塞状态的,即所在线程必须停住等待连接和数据的读入及写出,如果想在通信的时候可以开辟线程,一个线程对应一个socket连接,这样不仅资源消耗太高,而且线程安全问题也不容小觑!
niosocket可以再连接、读入、写出等待时执行其他代码,就像上面的例子中,快递站点的工作人员不是一直有事,可以再等待快递的时间里干别的事。
三、代码实现
服务器端
package AboutNioSocket.SimpleNioSocket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* @author hsw
* @create 2018-09-11 16:09
*/
public class NioSocketServer {
public static void main(String[] args) throws IOException, InterruptedException {
new NioSocketServer().start();
}
private void start() throws IOException, InterruptedException {
//使用静态open方法,生成一个ServerSocketChannel对象
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//设置其为非阻塞模式
serverSocketChannel.configureBlocking(false);
//获取有个selector对象
Selector selector = Selector.open();
//注册selector,第二个参数设置了其操作类型
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//绑定接受请求的端口
serverSocketChannel.bind(new InetSocketAddress(8001));
//循环接受请求
while (true) {
//等待三秒,如果没有请求就select方法返回0,运行else中的需要一部运行的代码
// 如果参数是0或没有参数的话就一直阻塞直到接收到请求
if (selector.select(3000) != 0) {
//selectedKeys方法获取SelectionKey的集合
//SelectionKey保存了请求的channel和selector信息
Set<SelectionKey> keys = selector.selectedKeys();
System.out.println("keys.length is " + keys.size());
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//获取后移除这个已经处理的请求!
iterator.remove();
//如果该key所在的channel或者selector关闭了,这里就会返回true
//如果是接收请求操作
if (key.isAcceptable()) {
accept(key);
//如果是写操作
} else if (key.isWritable()) {//
write(key);
//如果是读取操作
} else if (key.isReadable()) {
read(key);
}
}
} else {
System.out.println();
Thread.sleep(800);
}
}
}
//nioSocket是通过缓流进行读写操作的,这里先初始化好读写的缓冲流!
private ByteBuffer read = ByteBuffer.allocate(1024);
private ByteBuffer write = ByteBuffer.allocate(1024);
//这是接收到的字符串
private String getStr;
private void accept(SelectionKey key) throws IOException {
//这里的socket还没有注册客户端的channel,所以channel方法是获取ServerSocketChannel的
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
//而ServerSocketChannel的accept方法才是获取连接的channel的!
SocketChannel socketChannel = serverSocketChannel.accept();
//获取selector用的就是selector()方法
Selector selector = key.selector();
//也给其设置非阻塞模式
socketChannel.configureBlocking(false);
//注册服务器端的socket!本地分拣员能为客户端的channel服务了!
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("connect successfully");
}
private void write(SelectionKey key) throws IOException {
/*
* clear方法中的内容,准备写入
*
* capacity = 1024 初始化时给的大小就是1024
* limit = capacity
* position = 0
* mark = -1
* */
write.clear();
write.put(getStr.getBytes());
/*
* flip方法中的内容,准备读取
*
* capacity = 1024
* limit = getStr.getBytes()
* position = 0
* mark = -1
* */
write.flip();
// selector已经注册了客户端的channel
// channel()方法获取到的是发送请求的SocketChannel对象
SocketChannel channel = (SocketChannel) key.channel();
channel.configureBlocking(false);
channel.write(write);
Selector selector = key.selector();
//更换下一步的操作类型
channel.register(selector, SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
read.clear();
SocketChannel channel = (SocketChannel) key.channel();
channel.configureBlocking(false);
int num;
if ((num = channel.read(read)) == -1) {
System.out.println("未读到信息");
} else {
Selector selector = key.selector();
channel.register(selector, SelectionKey.OP_WRITE);
getStr = new String(read.array(), 0, num);
}
}
}
客户端
package AboutNioSocket.SimpleNioSocket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* @author hsw
* @create 2018-09-11 16:09
*/
public class NioSocketServer {
public static void main(String[] args) throws IOException, InterruptedException {
new NioSocketServer().start();
}
private void start() throws IOException, InterruptedException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChannel.bind(new InetSocketAddress(8001));
while (true) {
if (selector.select(3000) != 0) {
Set<SelectionKey> keys = selector.selectedKeys();
System.out.println("keys.length is " + keys.size());
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
//如果该key所在的channel或者selector关闭了,这里就会返回true
if (key.isAcceptable()) {
accept(key);
} else if (key.isWritable()) {
write(key);
} else if (key.isReadable()) {
read(key);
}
}
} else {
System.out.println();
Thread.sleep(800);
}
}
}
private ByteBuffer read = ByteBuffer.allocate(1024);
private ByteBuffer write = ByteBuffer.allocate(1024);
private String getStr;
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
Selector selector = key.selector();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("connect successfully");
}
private void write(SelectionKey key) throws IOException {
write.clear();
write.put(getStr.getBytes());
write.flip();
SocketChannel channel = (SocketChannel) key.channel();
channel.configureBlocking(false);
channel.write(write);
Selector selector = key.selector();
channel.register(selector, SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
read.clear();
SocketChannel channel = (SocketChannel) key.channel();
channel.configureBlocking(false);
int num;
if ((num = channel.read(read)) == -1) {
System.out.println("未读到信息");
} else {
Selector selector = key.selector();
channel.register(selector, SelectionKey.OP_WRITE);
getStr = new String(read.array(), 0, num);
}
}
}
四、代码流程解析
以服务器端为例(客户端原理差不多)
首先我们可以通过ServerSocketChannle的静态open方法产生一个ServerSocketChannel对象(其对应于一个ServerSocket对象,可以使用其socket()方法产生,然后运用这个ServerSocket对象进行监听,那还就是使用原来的socket了);有了ServerSocketChannel对象,使用configureBlocking(false)设置其为非阻塞模式,这样就可以异步处理其他的代码,可以在后面调用register方法注册Selector了。注册时会给其设置四种操作方法之一:
SelectionKey.OP_ACCEPT
SelectionKey.OP_CONNECT
SelectionKey.OP_WRITE
SelectionKey.OP_READ
代码很简单,基本的操作都写到注释中去了,这里不做详细说明
要知道的是Selector与Channel没有主属关系,没有谁属于谁的关系,一个Selector可以注册多个Chennel,一个Channel能注册多个Selector。Selector像快递分拣员一样能处理来自于不同地方的物流请求,Selector相比之下分的更细,他可以按照不同的类型分拣。分拣后的结果保存在SelectionKey中,可以通过SelectionKey中的channel方法和selector方法获取相应的channel和selector,通过isAcctable()、isConnectable()、isWritable()、isReadable()判断请求属于什么操作!
五、Buffer缓冲流
我们可以看到每次进行写操作时会调用clear(),读取时使用flip()方法,两个方法的具体操作在注释中已经写明,但这四个属性都有什么含义呢?
答:capacity:容量,在初始化时已经设置好,使用过程中不能改变!
limit:可以使用的范围的上限,开始时默认和capacity的值相等。当我们写入了一个字符串时,其大小就和该字符串的字节数相等,我们最多能操作的索引上限也就是这个数值!
position:当前操作的索引值,从0开始,随着get和put方法的操作改变。
mark:暂时保存position的位置,比如当前的position的值为10,我想现在访问15-20之间的内容,可以使用mark()方法保存当前的position的值,然后通过position(int pos)方法设置position的值为15,访问完后调用reset()方法使position = mark!需要注意的是如果将position的值设置得比mark小,mark就会被还原为默认值-1.
四个属性的大小:mark <= position <= limit <= capacity
后续更新!
网友评论