如果不了解AIO可以看下我的其他文章,这篇博客主要是通过Java 提供的AIO接口编写一个client与server互相发送消息的模型。
直接上代码了:
AioClient.Java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;
/**
* @description:
* @author: sanjin
* @date: 2019/7/9 15:20
*/
public class AioClient {
private AsynchronousSocketChannel clientChannel;
public AioClient(String host, int port) {
init(host,port);
}
private void init(String host, int port) {
try {
clientChannel = AsynchronousSocketChannel.open();
clientChannel.connect(new InetSocketAddress(host,port));
} catch (IOException e) {
e.printStackTrace();
}
}
public void doWrite(String line) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(line.getBytes(StandardCharsets.UTF_8));
buffer.flip();
clientChannel.write(buffer);
}
public void doRead() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
// read() 是一个异步方法,实际由OS实现,
// get()会阻塞,此处使用阻塞是因为后面要把结果打印
// 也可以去掉get,但是就必须实现 CompletionHandler
// 就像server端读取数据那样
clientChannel.read(buffer).get();
buffer.flip();
System.out.println("from server: "+new String(buffer.array(),StandardCharsets.UTF_8));
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
public void doDestory() {
if (null != clientChannel) {
try {
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
AioClient client = new AioClient("localhost", 8000);
try {
System.out.println("enter your message to server : ");
Scanner s = new Scanner(System.in);
String line = s.nextLine();
client.doWrite(line);
client.doRead();
} finally {
client.doDestory();
}
}
}
AioServer.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @description:
* @author: sanjin
* @date: 2019/7/9 15:20
*/
public class AioServer {
private ExecutorService service;
private AsynchronousServerSocketChannel serverChannel;
public ExecutorService getService() {
return service;
}
public AsynchronousServerSocketChannel getServerChannel() {
return serverChannel;
}
public AioServer(int port) {
init(port);
}
private void init(int port) {
System.out.println("server starting at port "+port+"..");
// 初始化定长线程池
service = Executors.newFixedThreadPool(4);
try {
// 初始化 AsyncronousServersocketChannel
serverChannel = AsynchronousServerSocketChannel.open();
// 监听端口
serverChannel.bind(new InetSocketAddress(port));
// 监听客户端连接,但在AIO,每次accept只能接收一个client,所以需要
// 在处理逻辑种再次调用accept用于开启下一次的监听
// 类似于链式调用
serverChannel.accept(this, new AioHandler());
try {
// 阻塞程序,防止被GC回收
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new AioServer(8000);
}
}
AioHandler.java
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
/**
* @description: client连接后的回调函数
* @author: sanjin
* @date: 2019/7/9 15:20
*/
public class AioHandler implements CompletionHandler<AsynchronousSocketChannel, AioServer> {
@Override
public void completed(AsynchronousSocketChannel result, AioServer attachment) {
// 处理下一次的client连接。类似链式调用
attachment.getServerChannel().accept(attachment, this);
// 执行业务逻辑
doRead(result);
}
/**
* 读取client发送的消息打印到控制台
*
* AIO中OS已经帮助我们完成了read的IO操作,所以我们直接拿到了读取的结果
*
*
* @param clientChannel 服务端于客户端通信的 channel
*/
private void doRead(AsynchronousSocketChannel clientChannel) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 从client读取数据,在我们调用clientChannel.read()之前OS,已经帮我们完成了IO操作
// 我们只需要用一个缓冲区来存放读取的内容即可
clientChannel.read(
buffer, // 用于数据中转缓冲区
buffer, // 用于存储client发送的数据的缓冲区
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("receive client data length:" + attachment.capacity() + " byte");
attachment.flip(); // 移动 limit位置
// 读取client发送的数据
System.out.println("from client : "+new String(attachment.array(), StandardCharsets.UTF_8));
// 向client写入数据
doWrite(clientChannel);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
}
}
);
}
private void doWrite(AsynchronousSocketChannel clientChannel) {
// 向client发送数据,clientChannel.write()是一个异步调用,该方法执行后会通知
// OS执行写的IO操作,会立即返回
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner s = new Scanner(System.in);
String line = s.nextLine();
buffer.put(line.getBytes(StandardCharsets.UTF_8));
buffer.flip();
clientChannel.write(buffer);
// clientChannel.write(buffer).get(); // 会进行阻塞,直到OS写操作完成
}
/**
* 异常处理逻辑
*
* @param exc
* @param attachment
*/
@Override
public void failed(Throwable exc, AioServer attachment) {
exc.printStackTrace();
}
}
一共有三个类:
-
AioClient
:client端 -
AioServer
:server端 -
AioHandler
:AIO中实际是由OS完成IO操作,当OS完成IO操作后,就会调用相应的callback(回调函数),AioHandler
就是回调函数,用于处理业务逻辑。
运行结果
先执行AioServer
,在执行AioClient
:
控制台AioClient
显示:
控制台
AioServer
显示:1.png
在控制台AioClient
输入要发送给server的内容,然后回车:
我发送的是“吴亦凡”
查看控制台
AioServer
:2.png
可以看到,控制台打印除了client发送的“鹿晗”,下面我们回复client:"鸡你太美!"
1.png
回到控制台AioClient
可以看到server发送的内容:
参考教程:
https://www.bilibili.com/video/av43425053/?p=1
https://www.bilibili.com/video/av43425053/?p=2
网友评论