Java AIO

作者: 三丶斤 | 来源:发表于2019-07-09 16:19 被阅读0次

如果不了解AIO可以看下我的其他文章,这篇博客主要是通过Java 提供的AIO接口编写一个client与server互相发送消息的模型。
直接上代码了:
AioClient.Java


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;

/**
 * @description:
 * @author: sanjin
 * @date: 2019/7/9 15:20
 */
public class AioClient {
    private AsynchronousSocketChannel clientChannel;

    public AioClient(String host, int port) {
        init(host,port);
    }

    private void init(String host, int port) {
        try {
            clientChannel = AsynchronousSocketChannel.open();
            clientChannel.connect(new InetSocketAddress(host,port));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void doWrite(String line) {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put(line.getBytes(StandardCharsets.UTF_8));
        buffer.flip();
        clientChannel.write(buffer);
    }

    public void doRead() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        try {
            // read() 是一个异步方法,实际由OS实现,
            // get()会阻塞,此处使用阻塞是因为后面要把结果打印
            // 也可以去掉get,但是就必须实现 CompletionHandler
            // 就像server端读取数据那样
            clientChannel.read(buffer).get();
            buffer.flip();
            System.out.println("from server: "+new String(buffer.array(),StandardCharsets.UTF_8));
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    public void doDestory() {
        if (null != clientChannel) {
            try {
                clientChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        AioClient client = new AioClient("localhost", 8000);
        try {
            System.out.println("enter your message to server : ");
            Scanner s = new Scanner(System.in);
            String line = s.nextLine();
            client.doWrite(line);
            client.doRead();
        } finally {
            client.doDestory();
        }
    }
}

AioServer.java


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @description:
 * @author: sanjin
 * @date: 2019/7/9 15:20
 */
public class AioServer {

    private ExecutorService service;

    private AsynchronousServerSocketChannel serverChannel;

    public ExecutorService getService() {
        return service;
    }

    public AsynchronousServerSocketChannel getServerChannel() {
        return serverChannel;
    }

    public AioServer(int port) {
        init(port);
    }



    private void init(int port) {
        System.out.println("server starting at port "+port+"..");
        // 初始化定长线程池
        service = Executors.newFixedThreadPool(4);
        try {
            // 初始化 AsyncronousServersocketChannel
            serverChannel = AsynchronousServerSocketChannel.open();
            // 监听端口
            serverChannel.bind(new InetSocketAddress(port));
            // 监听客户端连接,但在AIO,每次accept只能接收一个client,所以需要
            // 在处理逻辑种再次调用accept用于开启下一次的监听
            // 类似于链式调用
            serverChannel.accept(this, new AioHandler());

            try {
                // 阻塞程序,防止被GC回收
                TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new AioServer(8000);
    }
}

AioHandler.java


import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;


/**
 * @description: client连接后的回调函数
 * @author: sanjin
 * @date: 2019/7/9 15:20
 */
public class AioHandler implements CompletionHandler<AsynchronousSocketChannel, AioServer> {
    @Override
    public void completed(AsynchronousSocketChannel result, AioServer attachment) {
        // 处理下一次的client连接。类似链式调用
        attachment.getServerChannel().accept(attachment, this);

        // 执行业务逻辑
        doRead(result);
    }

    /**
     * 读取client发送的消息打印到控制台
     *
     * AIO中OS已经帮助我们完成了read的IO操作,所以我们直接拿到了读取的结果
     *
     *
     * @param clientChannel 服务端于客户端通信的 channel
     */
    private void doRead(AsynchronousSocketChannel clientChannel) {
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        // 从client读取数据,在我们调用clientChannel.read()之前OS,已经帮我们完成了IO操作
        // 我们只需要用一个缓冲区来存放读取的内容即可
        clientChannel.read(
                buffer,   // 用于数据中转缓冲区
                buffer,   // 用于存储client发送的数据的缓冲区
                new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer attachment) {
                        System.out.println("receive client data length:" + attachment.capacity() + " byte");
                        attachment.flip(); // 移动 limit位置
                        // 读取client发送的数据
                        System.out.println("from client : "+new String(attachment.array(), StandardCharsets.UTF_8));

                        // 向client写入数据
                        doWrite(clientChannel);
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {

                    }
                }
        );
    }

    private void doWrite(AsynchronousSocketChannel clientChannel) {

        // 向client发送数据,clientChannel.write()是一个异步调用,该方法执行后会通知
        // OS执行写的IO操作,会立即返回
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        Scanner s = new Scanner(System.in);
        String line = s.nextLine();
        buffer.put(line.getBytes(StandardCharsets.UTF_8));
        buffer.flip();
        clientChannel.write(buffer);
        // clientChannel.write(buffer).get(); // 会进行阻塞,直到OS写操作完成
    }

    /**
     * 异常处理逻辑
     *
     * @param exc
     * @param attachment
     */
    @Override
    public void failed(Throwable exc, AioServer attachment) {
        exc.printStackTrace();
    }
}

一共有三个类:

  • AioClient:client端
  • AioServer:server端
  • AioHandler:AIO中实际是由OS完成IO操作,当OS完成IO操作后,就会调用相应的callback(回调函数),AioHandler就是回调函数,用于处理业务逻辑。

运行结果

先执行AioServer,在执行AioClient:
控制台AioClient显示:

1.png
控制台AioServer显示:
1.png

在控制台AioClient输入要发送给server的内容,然后回车:
我发送的是“吴亦凡”

1.png
查看控制台AioServer:
2.png
可以看到,控制台打印除了client发送的“鹿晗”,下面我们回复client:"鸡你太美!"
1.png

回到控制台AioClient可以看到server发送的内容:

2.png

参考教程:

https://www.bilibili.com/video/av43425053/?p=1
https://www.bilibili.com/video/av43425053/?p=2

相关文章

  • Java AIO-Proactor模式

    Java7之前只支持BIO、NIO,但在Java 7 时添加了Java AIO,Java AIO基于epoll模式...

  • BIO,NIO,AIO 总结

    BIO,NIO,AIO 总结 Java 中的 BIO、NIO和 AIO 理解为是 Java 语言对操作系统的各种 ...

  • BIO,NIO,AIO 总结

    BIO,NIO,AIO 总结 Java 中的 BIO、NIO和 AIO 理解为是 Java 语言对操作系统的各种 ...

  • Java AIO基础API

    JDK1.7 AIO是异步非阻塞的,代码上AIO简化了NIO很多繁琐的实现。 AIOServer.java AIO...

  • Java AIO

    如果不了解AIO可以看下我的其他文章,这篇博客主要是通过Java 提供的AIO接口编写一个client与serve...

  • JAVA AIO

    JDK7中新增了一些与文件(网络)I/O相关的一些API,这些API被称为NIO2,或称为AIO(Asynchro...

  • NIO教程 ——检视阅读(上)

    NIO教程 ——检视阅读 参考 BIO,NIO,AIO 总结 Java NIO浅析 Java NIO 教程——极客...

  • Skill Tree

    Java IO: BIO/NIO/AIO 并发:Thread/Executor/Lock/Atomic/Concu...

  • BIO、NIO、AIO整理

    Java对BIO、NIO、AIO的支持: Java BIO (blocking I/O): 同步并阻塞,服务器实现...

  • [Java] Java NIO AIO

    Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一...

网友评论

      本文标题:Java AIO

      本文链接:https://www.haomeiwen.com/subject/yipxkctx.html