NIO是jdk1.4引入的java.nio包,它提供了高速的、面向块的IO。通过定义包含数据的类,以及通过块的形式处理这些数据。NIO类库包含缓冲区Buffer、多路复用选择器Selector、通道Channel等新的抽象,可以构建多路复用、同步非阻塞的IO程序,同时提供了更接近操作系统底层高性能的数据操作方式。
缓冲区Buffer,包含一些要写入或者要读出的数据。在面向流的IO中,可以将数据直接写入或者将数据直接读到Stream对象中。缓冲区提供了对数据的结构化访问以及维护读写位置等信息。
缓冲区Buffer是一个数组,通常是一个字节数组ByteBuffer,还有其他类型的数组字符缓冲区CharBuffer、短整型缓冲区ShortBuffer、整型缓冲区IntBuffer、长整形缓冲区LongBuffer、浮点型整型区FloatBuffer、双精度浮点型缓冲区。每一种Buffer的类都是Buffer接口的一个子实例。所以它们有完全一样的操作,只是操作的数据类型不一样。
通道Channel,Channel好比自来水管,网络数据通过Channel读取和写入。通道与流的不同之处在于,通道是双向的,而流是单向的,流只能在一个方向上移动,一个流必须是InputStream或者OutputStream的子类,通道可以用于读写或者两者同时进行。通道Channel是全双工的,所以它可以比流更好地映射底层操作系统的API。Channel可以分为两大类,一类用于网络对象,一类用于文件操作。
多路复用选择器Selector,Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,通过SelectionKey可以获得就绪Channel的集合,进行后续的IO操作。
一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用epoll()代替传统的select实现,所以它并没有最大连接句柄1024/2048的限制。只要一个线程负责Selector的轮询,就可以接入成千上万的客户端。
下面来看服务器端代码:
package com.test.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* NIO服务器端
* @author 程就人生
* @Date
*/
public class HelloServer {
public static void main( String[] args ){
int port = 8080;
// 多路复用服务类
MultiplexerHelloServer helloServer = new MultiplexerHelloServer(port);
new Thread(helloServer,"多路复用服务类").start();
}
}
class MultiplexerHelloServer implements Runnable{
private Selector selector;
private ServerSocketChannel serverChannel;
private volatile boolean stop;
/**
* 初始化多路复用,绑定监听端口
* @param port
*/
MultiplexerHelloServer(int port){
try {
// 初始化多路复用器,创建Selector
selector = Selector.open();
// 打开ServerSocketChannel
serverChannel = ServerSocketChannel.open();
// 设置为非堵塞模式
serverChannel.configureBlocking(false);
// 绑定监听端口
serverChannel.socket().bind(new InetSocketAddress(port), 1024);
// 将ServerSocketChannel注册到Selector上去,监听accept事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器端已启动,启动端口为:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
public void stop(){
this.stop = true;
}
public void run() {
while(!stop){
try {
SelectionKey key = null;
// 每隔一秒被唤醒一次
selector.select(1000);
// 获取就绪状态的SelectionKey
Set<SelectionKey> selectedKeys = selector.selectedKeys();
// 对就绪状态的SelectionKey进行迭代
Iterator<SelectionKey> it = selectedKeys.iterator();
while(it.hasNext()){
key = it.next();
it.remove();
try{
// 对网络事件进行操作(连接和读写操作)
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
try {
key.channel().close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
// 多路复用器关闭后,所有注册在上面的channel和pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if(selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
// 处理新接入的客户端请求信息
if(key.isAcceptable()){
// 接入新的连接
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
// 设置为异步非阻塞
sc.configureBlocking(false);
// 监听读操作
sc.register(selector, SelectionKey.OP_READ);
}
// 处理客户端发来的信息,读取操作
if(key.isReadable()){
SocketChannel sc = (SocketChannel) key.channel();
// 开辟一个1KB的缓冲区
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
// 读取到了字节
if(readBytes > 0){
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
// 对字节码进行解码
String body = new String(bytes, "UTF-8");
System.out.println("服务器端收到的:" + body);
// 回写客户端
doWrite(sc);
}
}
}
}
// 回写客户端
private void doWrite(SocketChannel sc) throws IOException{
byte[] bytes = "服务器端的响应来了".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
sc.write(writeBuffer);
}
}
在40-56行,在构造方法中对资源进行初始化。创建多路复用选择器Selector、ServerSocketChannel,对Channel和TCP参数进行配置。
在63行代码,对网络事件进行轮询监听;在67行代码中,每隔1s唤醒一次,监听多路复用选择器中是否有就绪的SelectionKey,如果有则进行遍历。
在105行代码中,在handleInput方法中,对SelectionKey进行判断。判断SelectionKey目前所处的状态,是接入的新连接,还是处于网络读状态。如果是新连接,则监听网络读操作。如果是网络读操作,在通过doWrite方法回写客户端。
客户端代码:
package com.test.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* NIO客户端
* @author 程就人生
* @Date
*/
public class HelloClient {
public static void main( String[] args ){
int port = 8080;
new Thread(new HelloClientHandle("127.0.0.1", port)).start();
}
}
/**
* 客户端处理器
* @author 程就人生
* @Date
*/
class HelloClientHandle implements Runnable{
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
private String host;
private int port;
public HelloClientHandle(String host, int port) {
try {
this.host = host;
this.port = port;
// 创建多路复用选择器
selector = Selector.open();
// 打开SocketChannel
socketChannel = SocketChannel.open();
// 设置为非阻塞
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
try {
// 连接服务器端判断
if(!socketChannel.connect(new InetSocketAddress(host,port))){
// 将socketChannel注册到多路复用器,并监听连接操作
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
} catch (IOException e) {
e.printStackTrace();
}
while(!stop){
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
handleInput(key);
}
} catch (IOException e) {
e.printStackTrace();
}
}
if(selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
// 完成连接
if(sc.finishConnect()){
// 监听读时间
sc.register(selector, SelectionKey.OP_READ);
// 给服务器端发送消息
doWrite(sc);
}else{
// 退出
System.exit(1);
}
}
// 是否为可读的
if(key.isReadable()){
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes =sc.read(readBuffer);
// 读到了字节
if(readBytes > 0){
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "utf-8");
System.out.println("客户端收到的信息是:" + body);
this.stop = true;
// 没读到字节
} else if(readBytes < 0){
// 取消
key.cancel();
// 关闭连接
sc.close();
}
}
}
}
/**
* 网络写操作
* @param sc
* @throws IOException
*/
private void doWrite(SocketChannel sc) throws IOException{
byte[] req = "来自客户端的消息".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if(!writeBuffer.hasRemaining()){
System.out.println("已发送给服务器端~!");
}
}
}
客户端的连接步骤:先创建多路复用选择器,打开SocketChannel,绑定本地端地址,设置SocketChannel为非阻塞,异步连接服务器。将SocketChannel注册到多路选择器中,注册监听事件。
SocketChannel连接服务器连接成功后,对SelectionKey进行轮询监听,每隔10s唤醒一次。在97行,连接成功后,监听网络读事件,并给服务器端发送消息。
在第106行,坚挺到网络读事件后,将字节读出,并打印出来。如果读取完毕,则关闭通道,关闭连接。
服务器端运行结果 客户端运行结果客户端发起的连接操作是异步的,通过在多路复用器注册OP_CONNECT等待后续结果,不需要之前那样被同步阻塞。SocketChannel的读写操作都是异步的。如果没有可读写的数据不会同步等待。
以上便是来自java.nio包的非阻塞服务器端、客户端编码的简单演示。
网友评论