美文网首页
Java IO 演进之路

Java IO 演进之路

作者: 剑道_7ffc | 来源:发表于2020-03-22 14:44 被阅读0次

必须明白的几个概念

阻塞(Block)和非阻塞(Non-Block)

阻塞和非阻塞是在缓冲区数据没有准备就绪的情况下处理方式
阻塞:在没有就绪下,就一直等待哪里
非阻塞:在没有就绪下,则直接返回
阻塞和非阻塞是相对于io操作来讲
举例如下图:从机器内存到jvm内存


image.png

同步(Synchronization)和异步(Asynchronous)

同步和异步是处理io事件的采用方式
同步:应用程序参与io的处理
异步:io交给操作系统,应用程序只需要等待通知
异步和同步是相对于时间点,异步指同一时间点做多个处理,同步指同一时间点做一个处理。

读和写

读指input,写指output,输入和输出是相对于内存来讲的

BIO 与 NIO 对比

BIO:面向流(乡村公路),阻塞 IO(多线程)
NIO:面向缓冲(高速公路,多路复用技术),非阻塞IO(反应堆 Reactor),选择器(轮询机制)


image.png

阻塞和非阻塞

NIO是发现缓冲数据没有准备好,直接返回,处理其他事情,因此可以一个线程处理多个输入和输出通道。
BIO 是发现缓冲区数据没有准备好,则等待哪里,因此,可以一个线程处理一个输入和输出通道。

Java AIO

真正实现异步IO

代码实现

BIO

1 服务端

//同步阻塞IO模型
public class BIOServer {
    ServerSocket server;
    public BIOServer(int port){
        try {
            server = new ServerSocket(port);
            System.out.println("BIO服务已启动,监听端口是:" + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void listen() throws IOException{
        //循环监听
        while(true){
            //等待客户端连接,阻塞方法
            Socket client = server.accept();

            InputStream is = client.getInputStream();
            //网络客户端把数据发送到网卡,机器所得到的数据读到了JVM内中
            byte [] buff = new byte[1024];
            int len = is.read(buff);
            if(len > 0){
                String msg = new String(buff,0,len);
                System.out.println("收到" + msg);
            }
        }
    }   
    public static void main(String[] args) throws IOException {
        new BIOServer(8080).listen();
    }
}

2 客户端

public class BIOClient {
    public static void main(String[] args) throws UnknownHostException, IOException {
        Socket client = new Socket("localhost", 8080);

        OutputStream os = client.getOutputStream();

        //生成一个随机的ID
        String name = UUID.randomUUID().toString();

        System.out.println("客户端发送数据:" + name);
        os.write(name.getBytes());
        os.close();
        client.close();
    }
}

NIO

1 服务端

/**
 * NIO的操作过于繁琐,于是才有了Netty
 * Netty就是对这一系列非常繁琐的操作进行了封装
 */
public class NIOServerDemo {

    private int port = 8080;

    //轮询器 Selector 大堂经理
    private Selector selector;
    //缓冲区 Buffer 等候区
    private ByteBuffer buffer = ByteBuffer.allocate(1024);

    //初始化完毕
    public NIOServerDemo(int port){
        try {
            this.port = port;

            //远程服务器通道
            ServerSocketChannel server = ServerSocketChannel.open();
            //我得告诉地址
            server.bind(new InetSocketAddress(this.port));
            //BIO 升级版本 NIO,为了兼容BIO,NIO模型默认是采用阻塞式
            server.configureBlocking(false);

            //轮训器
            selector = Selector.open();

            //轮训器属于该通道
            server.register(selector, SelectionKey.OP_ACCEPT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void listen(){
        System.out.println("listen on " + this.port + ".");
        try {
            //轮询主线程
            while (true){
                //通道个数,当没有通道则阻塞,有则继续进行
                selector.select();
                //每次都拿到所有的号子
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iter = keys.iterator();
                //不断地迭代,就叫轮询
                //同步体现在这里,因为每次只能拿一个key,每次只能处理一种状态
                while (iter.hasNext()){
                    SelectionKey key = iter.next();
                    iter.remove();
                    //每一个key代表一种状态
                    //数据就绪、数据可读、数据可写 等等等等
                    process(key);
                }
                
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    //具体办业务的方法,坐班柜员
    //在同一时间点,只能干一件事
    private void process(SelectionKey key) throws IOException {
        //针对于每一种状态给一个反应
        if(key.isAcceptable()){
            ServerSocketChannel server = (ServerSocketChannel)key.channel();
            //这个方法体现非阻塞,不管你数据有没有准备好
            //你给我一个状态和反馈
            SocketChannel channel = server.accept();
            //一定一定要记得设置为非阻塞
            channel.configureBlocking(false);
            //当数据准备就绪的时候,将状态改为可读
            channel.register(selector,SelectionKey.OP_READ);
        }
        else if(key.isReadable()){
            //key.channel 从多路复用器中拿到客户端的引用
            SocketChannel channel = (SocketChannel)key.channel();
            int len = channel.read(buffer);
            if(len > 0){
                buffer.flip();
                String content = new String(buffer.array(),0,len);
                key = channel.register(selector,SelectionKey.OP_WRITE);
                //在key上携带一个附件,一会再写出去
                key.attach(content);
                System.out.println("读取内容:" + content);
            }
        }
        else if(key.isWritable()){
            SocketChannel channel = (SocketChannel)key.channel();

            String content = (String)key.attachment();
            channel.write(ByteBuffer.wrap(("输出:" + content).getBytes()));

            channel.close();
        }
    }
    public static void main(String[] args) {
        new NIOServerDemo(8080).listen();
    }
}

2 客户端
同BIO客户端

AIO

1 服务端

public class AIOServer {
    private final int port;
    public static void main(String args[]) {
        int port = 8000;
        new AIOServer(port);
    }
    public AIOServer(int port) {
        this.port = port;
        listen();
    }
    private void listen() {
        try {
            ExecutorService executorService = Executors.newCachedThreadPool();
            AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
            //开门营业
            //工作线程,用来侦听回调的,事件响应的时候需要回调
            final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(threadGroup);
            server.bind(new InetSocketAddress(port));
            System.out.println("服务已启动,监听端口" + port);

            //准备接受数据
            server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){
                final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
                //实现completed方法来回调
                //由操作系统来触发
                //回调有两个状态,成功
                public void completed(AsynchronousSocketChannel result, Object attachment){
                    System.out.println("IO操作成功,开始获取数据");
                    try {
                        buffer.clear();
                        result.read(buffer).get();
                        buffer.flip();
                        result.write(buffer);
                        buffer.flip();
                    } catch (Exception e) {
                        System.out.println(e.toString());
                    } finally {
                        try {
                            result.close();
                            server.accept(null, this);
                        } catch (Exception e) {
                            System.out.println(e.toString());
                        }
                    }

                    System.out.println("操作完成");
                }

                @Override
                //回调有两个状态,失败
                public void failed(Throwable exc, Object attachment) {
                    System.out.println("IO操作是失败: " + exc);
                }
            });

            try {
                Thread.sleep(Integer.MAX_VALUE);
            } catch (InterruptedException ex) {
                System.out.println(ex);
            }
        } catch (IOException e) {
            System.out.println(e);
        }
    }
}

2 客户端

public class AIOClient {
    private final AsynchronousSocketChannel client;

    public AIOClient() throws Exception{
        client = AsynchronousSocketChannel.open();
    }

    public void connect(String host,int port)throws Exception{
        client.connect(new InetSocketAddress(host,port),null,new CompletionHandler<Void,Void>() {
            @Override
            public void completed(Void result, Void attachment) {
                try {
                    client.write(ByteBuffer.wrap("这是一条测试数据".getBytes())).get();
                    System.out.println("已发送至服务器");
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                exc.printStackTrace();
            }
        });
        final ByteBuffer bb = ByteBuffer.allocate(1024);
        client.read(bb, null, new CompletionHandler<Integer,Object>(){

                    @Override
                    public void completed(Integer result, Object attachment) {
                        System.out.println("IO操作完成" + result);
                        System.out.println("获取反馈结果" + new String(bb.array()));
                    }

                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        exc.printStackTrace();
                    }
                }
        );

        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException ex) {
            System.out.println(ex);
        }

    }
    public static void main(String args[])throws Exception{
        new AIOClient().connect("localhost",8000);
    }
}

相关文章

  • Java IO 演进之路

    必须明白的几个概念 阻塞(Block)和非阻塞(Non-Block) 阻塞和非阻塞是在缓冲区数据没有准备就绪的情况...

  • NIO

    JavaIO演进之路 IO基础入门 Linux 网络IO模型简介 linux内核把所有的外部设备都看做一个文件,对...

  • IO 演进

    写程序难免与 IO 打交道,无论是文件 IO、网络 IO,涉及到数据的存储、交换,大多数情况都会见到IO 的身影。...

  • Netty

    Netty 之 Java 的 I/O 演进之路:http://www.jianshu.com/p/22f8586b...

  • 好文推荐java工程师成神之路

    java工程师成神之路https://hollischuang.github.io/toBeTopJavaer/#/

  • java知识梳理学习站

    1、java工程师成神之路 gitee在线地址:http://hollischuang.gitee.io/tobe...

  • Java的IO和NIO

    Java的IO和NIO 一、Java的IO Java的IO功能在java.io包下,包括输入、输出两种IO流,每种...

  • Java 的 I/O演进之路

    1 I/O 基础 1.1 基本概念 同步与异步: 同步是一种可靠的有序运行机制,当我们进行同步操作时,后续的任务是...

  • java中文件的操作

    前言:java的读写操作是学java开发的必经之路,Java.io包中包括许多类提供许多有关文件的各个方面操作。下...

  • Java Socket IO演进(一)-BIO/NIO/AIO

    1. 概览 Java中主要有三种IO模型,分别是同步阻塞IO(BIO)、同步非阻塞IO(NIO)、异步非阻塞IO(...

网友评论

      本文标题:Java IO 演进之路

      本文链接:https://www.haomeiwen.com/subject/jufvyhtx.html