JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0。同时,Java正式提供了异步文件IO操作,同时提供给了与UNIX网络编程事件驱动IO对应的AIO。
NIO2.0引入了新的异步通道的概念,提供了异步文件通道和异步套接字通道的实现。异步通道以两种方式获取操作结果。
-
通过java.util.concurrent.Future来来获取异步操作的结果;
-
在执行异步操作的时候传入一个java.nio.channels。
NIO2.0的异步套接字是真正的异步非阻塞IO,对应UNIX网络编程中的事件驱动IO,即AIO。它不需要通过多路复用选择器Selector对注册的通道进行轮询操作即可实现异步读写,简化了NIO的编程模型。服务器代码:
package com.test.aio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
/**
* AIO编程服务器端
* @author 程就人生
* @Date
*/
public class HelloServer {
public static void main( String[] args ){
int port = 8080;
AsyncHelloServerHandler helloServer = new AsyncHelloServerHandler(port);
new Thread(helloServer,"AIO").start();
}
}
/**
* AIO编程handler处理类
* @author 程就人生
* @Date
*/
class AsyncHelloServerHandler implements Runnable{
private int port;
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AsyncHelloServerHandler(int port) {
this.port = port;
try {
// 创建一个异步的服务端通道
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
// 绑定监听端口
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("AIO编程,服务器端已启动,启动端口号为:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
// 完成一组正在执行的操作之前,允许当前的线程一直阻塞
latch = new CountDownLatch(1);
// 接收客户端的连接
doAccept();
try{
latch.await();
}catch(InterruptedException e){
e.printStackTrace();
}
}
private void doAccept() {
asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());
}
}
/**
* 接收新的客户端连接
* @author 程就人生
* @Date
*/
class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncHelloServerHandler>{
public void completed(AsynchronousSocketChannel result, AsyncHelloServerHandler attachment) {
attachment.asynchronousServerSocketChannel.accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
public void failed(Throwable exc, AsyncHelloServerHandler attachment) {
attachment.latch.countDown();
}
}
/**
* 接收通知回调处理的handler
* @author 程就人生
* @Date
*/
class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer>{
private AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
try {
String req = new String(body, "utf-8");
System.out.println("服务器端收到的:" + req);
// 应答
doWrite();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
private void doWrite() {
byte[] bytes = "服务器端的反馈消息".getBytes();
// 发送缓冲区
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>(){
public void completed(Integer result, ByteBuffer buffer) {
// 如果没有发送完,继续发送
if(buffer.hasRemaining()){
channel.write(buffer, buffer, this);
}
}
public void failed(Throwable exc, ByteBuffer buffer) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
在AsyncHelloServerHandler 的构造方法中,首先创建一个异步的服务器端通道AsynchronousServerSocketChannel,然后调用bind方法绑定监听端口。
在线程run方法中,初始化CountDownLatch对象,完成一组正在执行的操作之前,允许当前的线程一直阻塞,防止服务器端执行完退出,实际项目中不需要。
使用doAccept方法接收客户端的连接,因为是异步操作,在这里传递一个CompletionHandler类型的handler实例接收accept操作成功的通知消息。在AcceptCompletionHandler 类中可以接收新加入的客户端连接。在77行预分配1KB的缓冲区。78行调用read进行异步读操作。
在118行先进行flip操作,为后续从缓冲区读取数据做准备。根据缓冲区的可读字节数创建byte数据组,通过new String对字节数组进行编码,并打印出来。最后通过doWrite回写客户端。
客户端代码:
package com.test.aio;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;
/**
* AIO编程客户端
* @author 程就人生
* @Date
*/
public class HelloClient {
public static void main( String[] args ){
int port = 8080;
new Thread(new AsyncHelloHandler("127.0.0.1",port)).start();
}
}
class AsyncHelloHandler implements CompletionHandler<Void, AsyncHelloHandler>, Runnable{
private String host;
private int port;
private AsynchronousSocketChannel client;
private CountDownLatch latch;
public AsyncHelloHandler(String host, int port) {
this.host = host;
this.port = port;
try {
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public void run() {
latch = new CountDownLatch(1);
client.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public void completed(Void result, AsyncHelloHandler attachment) {
byte[] req = "客户端的消息".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
client.write(writeBuffer,writeBuffer, new CompletionHandler<Integer, ByteBuffer>(){
public void completed(Integer result, ByteBuffer buffer) {
if(buffer.hasRemaining()){
client.write(buffer, buffer, this);
}else{
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>(){
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
try {
String body = new String(bytes, "utf-8");
System.out.println("客户端读取到的:" + body);
latch.countDown();
System.out.println("客户端操作完毕,关闭连接");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public void failed(Throwable exc, ByteBuffer buffer) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
public void failed(Throwable exc, AsyncHelloHandler attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
}
在20行,客户端通过一个独立的线程创建异步客户端处理handler。在AsyncHelloHandler中,使用了大量匿名内部类。第38行,通过AsynchronousSocketChannel的open方法创建了一个新的AsynchronousSocketChannel对象。
在第45行,创建CountDownLatch进行等待,防止异步操咋没有执行完线程就退出了。在第46行,通过connect方法发起异步操作。
在第73行,completed异步连接成功之后的方法回调,创建请求消息体,对bytebuffer进行flip,并发给服务器端。发送给服务器端后,接收服务器端的反馈,并进行打印,最后调用latch的countDown()方法,关闭客户端连接。
服务器端执行结果:
image.png客户端执行结果:
image.png以上便是来自java.nio包的非阻塞异步服务器端、客户端编码的简单演示。
网友评论