原生Nio编程
NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现。
在 jdk1.4 里提供的新 api 。 Sun 官方标榜的特性如下:
- 为所有的原始类型提供(Buffer)缓存支持;
- 字符集编码解码解决方案;
- Channel: 一个新的原始I/O抽象;
- 支持锁和内存映射文件的文件访问接口;
- 提供多路(non-blocking)非阻塞式的高伸缩性网络I/O。
Nio编程中有三个重要的组件Channel、Buffer和Selector。
缓冲区Buffer
Buffer是一个对象,包含一些要写入或者读出的数据。
在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。
缓冲区实际上是一个数组,并提供了对数据结构化访问以及维护读写位置等信息。
具体的缓存区有这些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他们实现了相同的接口:Buffer。
通道channel
我们对数据的读取和写入要通过Channel,它就像水管一样,是一个通道。通道不同于流的地方就是通道是双向的,可以用于读、写和同时读写操作。
底层的操作系统的通道一般都是全双工的,所以全双工的Channel比流能更好的映射底层操作系统的API。
Channel主要分为两大类:
SelectableChannel:用户网络读写,SocketChannel和ServerSocketChannel都是其子类;
FileChannel:用于文件操作。
详细的简介:http://www.iteye.com/topic/834447
多路复用器Selector
Selector是Java NIO 编程的基础。
Selector提供选择已经就绪的任务的能力:Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。
一个Selector可以同时轮询多个Channel,因为JDK使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制。所以,只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。
Nio服务端代码demo
NIO创建的NioServer源码:
public class NioServer {
private static int DEFAULT_PORT = 12345;
private static NioserverHandle serverHandle;
public static void start() {
start(DEFAULT_PORT);
}
public static synchronized void start (int port) {
if (serverHandle != null) {
serverHandle.stop();
}
serverHandle = new NioserverHandle(port);
new Thread(serverHandle, "Server").start();
}
public static void main(String[] args) {
start();
}
}
NioserverHandle:
public class NioserverHandle implements Runnable {
private Selector selector;
private ServerSocketChannel serverChannel;
private volatile boolean started;
public NioserverHandle(int port) {
try {
// 1. 创建选择器
selector = Selector.open();
// 2. 打开监听通道
serverChannel = ServerSocketChannel.open();
// 3. 开启通道为非阻塞模式
serverChannel.configureBlocking(false);
// 4. 绑定端口 backlog 设置为1024
serverChannel.socket().bind(new InetSocketAddress(port), 1024);
// 5. 监听客户端连接请求
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6. 标记服务器已开启
started = true;
System.out.println("nio server 服务器已经开启,端口号:" + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
started = false;
}
@Override
public void run() {
// 循环遍历selector
while (started) {
try {
// 无论是否有读写发生,selector每隔1s被唤醒一次
selector.select(1000);
// 阻塞,只有当至少一个注册的事件发生的时候才会继续
// selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
// selector 关闭后会自动释放里面管理的资源
if (selector != null) {
try {
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 处理新接入的请求信息
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
// 通过ServerSocketChannel的accept创建SocketChannel实例
// 完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
SocketChannel sc = ssc.accept();
// 设置为非阻塞的
sc.configureBlocking(false);
// 注册为读事件
sc.register(selector, SelectionKey.OP_READ);
}
// 读消息
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
// 创建buffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
// 读取到字节,对字节进行编码解码
if (readBytes > 0) {
// 将缓冲区当前的 limit 设置为 position=0,用于后续对缓冲区的读取操作
buffer.flip();
// 根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
// 将缓冲区可读字节复制到新建的数组中
buffer.get(bytes);
String expression = new String(bytes, "UTF-8");
System.out.println("服务端收到消息:" + expression);
// 处理数据
// ...
// 模拟应答
String result = expression + "_nioserver";
doWrite(sc, result);
} else if (readBytes == 0){ // 没有读到字节,忽略
;
} else { // readBytes < 0 ,表示链路已经关闭,释放资源
key.cancel();
sc.close();
}
}
}
}
private void doWrite(SocketChannel channel, String response) throws IOException {
// 将消息编码为字节数组
byte[] bytes = response.getBytes();
// 根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
// 将字节复制到缓冲区
writeBuffer.put(bytes);
writeBuffer.flip();
// 发送缓冲区的字节数组
channel.write(writeBuffer);
// **** 此处不含处理“写半包”的代码
}
}
可以看到创建Nio服务端的主要步骤如下:
打开ServerSocketChannel,监听客户端连接
绑定监听端口,设置连接为非阻塞模式
创建Reactor线程,创建多路复用器并启动线程
将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
Selector轮询准备就绪的key
Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,简历物理链路
设置客户端链路为非阻塞模式
将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
异步读取客户端消息到缓冲区
对Buffer编解码,处理半包消息,将解码成功的消息封装成Task
将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端
因为应答消息的发送,SocketChannel也是异步非阻塞的,所以不能保证一次能吧需要发送的数据发送完,此时就会出现写半包的问题。我们需要注册写操作,不断轮询Selector将没有发送完的消息发送完毕,然后通过Buffer的hasRemain()方法判断消息是否发送完成。
Nio客户端代码
NioClient
public class NioClient {
private static String DEFAULT_HOST = "127.0.0.1";
private static int DEFAULT_PORT = 12345;
private static NioclientHandle clientHandle;
public static void start() {
start(DEFAULT_HOST, DEFAULT_PORT);
}
public static synchronized void start(String ip, int port) {
if (clientHandle != null) {
clientHandle.stop();
}
clientHandle = new NioclientHandle(ip, port);
new Thread(clientHandle, "client").start();
}
// 向服务器发送信息
public static boolean sendMsg(String msg) throws Exception {
if (msg.equals("q"))
return false;
clientHandle.sendMsg(msg);
return true;
}
public static void main(String[] args) {
start();
}
}
客户端处理类:
public class NioclientHandle implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean started;
public NioclientHandle(String ip, int port) {
this.host = ip;
this.port = port;
try {
// 创建选择器
selector = Selector.open();
// 打开监听通道
socketChannel = SocketChannel.open();
// 如果为true,则此通道被置于阻塞模式;如果为false,则此通道被置于非阻塞模式
socketChannel.configureBlocking(false); // 非阻塞模式
started = true;
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
// 建立客户端连接
try {
doConnect();
} catch (IOException e) {
e.printStackTrace();
}
// 循环遍历selector
while(started) {
try {
// 无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(1000);
// 阻塞,只有当至少一个注册的事件发生的时候才会继续
// selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while(it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (IOException e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
// selector 关闭后会自动释放里面管理的资源
if (selector != null) {
try {
selector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect());
else System.exit(1);
}
// 读消息
if (key.isReadable()) {
// 创建一个ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String result = new String(bytes,"UTF-8");
System.out.println("客户端收到消息:" + result);
}
//没有读取到字节 忽略
// else if(readBytes==0);
//链路已经关闭,释放资源
else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
public void stop() {
started = false;
}
private void doConnect() throws IOException {
if (socketChannel.connect(new InetSocketAddress(host, port)));
else
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
public void sendMsg(String msg) throws Exception {
socketChannel.register(selector,SelectionKey.OP_READ);
doWrite(socketChannel, msg);
}
private void doWrite(SocketChannel channel, String request) throws IOException {
// 将消息编码为字节数组
byte[] bytes = request.getBytes();
// 根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
// 将字节数组复制到缓冲区
writeBuffer.put(bytes);
// flip操作
writeBuffer.flip();
// 发送缓冲区的字节数组
channel.write(writeBuffer);
}
}
测试类:
public class NioTest {
public static void main(String[] args) throws Exception {
// 运行服务端
NioServer.start();
Thread.sleep(1000);
// 运行客户端
NioClient.start();
while (NioClient.sendMsg(new Scanner(System.in).nextLine()));
}
}
网友评论