美文网首页
NIO入门(Buffer & Channel & Selecto

NIO入门(Buffer & Channel & Selecto

作者: 宫新程 | 来源:发表于2019-12-08 18:13 被阅读0次

    个人学习笔记,源码链接:https://github.com/GongXincheng/gxc-nio-netty

    1:Buffer 缓冲区

    1.1:Buffer 中的属性

    规则:mark <= position <= limit <= capacity
    
    capacity:容量,表示缓冲区中最大存储数据的容量,一旦声明无法更改。
    
    limit:界限,表示缓冲区中可以操作数据的大小(limit后的数据不能读写)。
    
    position:位置,表示缓存区中正在操作数据的位置。
    
    mark:标记,表示当前position的位置,可以通过reset()恢复到 mark 位置
    
    

    1.2:ByteBuffer中常用方法

    /**
    * 分配新的字节缓冲区
    */
    ByteBuffer allocate(int cap) {
        capacity = cap; 
        limit = cap;
        position = 0;
        mark = -1;
    } 
                    
    /**
    * 将写模式转换为读模式
    */
    Buffer flip() {
        limit = position; 
        position = 0;
        mark = -1;
    }
                    
    /**
    * 可重复读数据
    */
    Buffer rewind() {
        position = 0;
        mark = -1;
    }
    
    /**
    * 清空缓冲区,但是缓冲区中的数据都还在,处于"被遗忘"状态
    */
    Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;
    }
                   
    /**
    * 标记position的位置
    */
    Buffer mark() {
        mark = position;
    }
                    
    /**
    * position恢复到mark的位置
    */
    Buffer reset() {
        position = mark
    }
    
    /**
    * 缓冲区中时候还有可读数据
    */
    boolean hasRemaining() {
        return position < limit
    }
    
    /**
    * 可读数据的个数
    */
    int remaining() {
        return limit - position
    }
    

    1.3:直接缓冲区和非直接缓冲区

    非直接缓冲区:通过allocate()方法分配缓冲区,将缓冲区建立在JVM的内存中

    直接缓冲区:通过allocateDirect()方法分配直接缓冲区,将缓冲区建立在物理内存中

    直接缓冲区
    非直接缓冲区

    2:Channel 通道

    2.1:Channel简介

        用于 源节点 与 目标节点 的连接,在 Java NIO 中负责缓冲区中的数据传输。
        Channel 本身不存储数据,因此需要配合 Buffer 进行传输
    
    Channel

    2.2:Channel 的主要实现类

    java.nio.channels.Channel 接口
        |-- FileChanel
        |-- SelectableChannel
            |-- SocketChanel
            |-- ServerSockerChanel
            |-- DatagramChanel
    

    2.3:获取 Channel 的方式

    1:Java 针对支持通道的类提供了 getChannel() 方法
    eg: 
        本地IO:
            FileInputStream / FileOutputStream
            RandomAccessFile
        网络IO:
            Socket
            ServerSocket
            DatagramSocket
            
    2:在 JDK 1.7 中的 NIO.2 针对各个通道提供了静态方法 open()
    
    3:在 JDK 1.7 中的 NIO.2 的 Files 工具类的 newByteChannel()
    
    

    2.4:利用通道完成文件复制(非直接缓冲区)

    @Test
    public void testUseChannelToCopyFile() throws Exception {
        FileInputStream fis = new FileInputStream("/Users/gxc/tmp/sl.mp4");
        FileOutputStream fos = new FileOutputStream("/Users/gxc/tmp/sl2.mp4");
    
        // 1:获取通道
        FileChannel inChannel = fis.getChannel();
        FileChannel outChannel = fos.getChannel();
    
        // 2:分配指定大小的缓冲区
        ByteBuffer buf = ByteBuffer.allocate(1024);
    
        // 3:将通道中的数据存入到缓冲区中
        while(inChannel.read(buf) != -1) {
            // 将 buffer 的写模式切换成读模式
            buf.flip();
            // 4:将缓冲区中的数据写入到通道中
            outChannel.write(buf);
            // 清除缓冲区
            buf.clear();
        }
    
        // 5:关闭资源
        outChannel.close();
        inChannel.close();
        fos.close();
        fis.close();
    }
    

    2.5:使用直接缓冲区完成文件的复制,直接缓冲区(内存映射文件)

    @Test
    public void testUseChannelToCopyFile2() throws Exception {
        FileChannel inChannel = FileChannel.open(
                Paths.get("/Users/gxc/tmp/sl.mp4"),
                StandardOpenOption.READ);
    
        FileChannel outChannel = FileChannel.open(
                Paths.get("/Users/gxc/tmp/sl3.mp4"),
                StandardOpenOption.READ,
                StandardOpenOption.WRITE,
                StandardOpenOption.CREATE_NEW);
    
        // 内存映射文件
        MappedByteBuffer inMappedBuf = inChannel.map(
                FileChannel.MapMode.READ_ONLY, 0,
                inChannel.size());
    
        MappedByteBuffer outMappedBuf = outChannel.map(
                FileChannel.MapMode.READ_WRITE, 0,
                inChannel.size());
    
        // 直接对缓冲区 进行数据的读写操作
        byte[] dst = new byte[inMappedBuf.limit()];
        inMappedBuf.get(dst);
        outMappedBuf.put(dst);
    
        inChannel.close();
        outChannel.close();
    }
    

    2.6:通道之间的数据传输(直接缓冲区)

    @Test
    public void testUseChannelToCopyFile3() throws Exception {
        FileChannel inChannel = FileChannel.open(
                Paths.get("/Users/gxc/tmp/sl.mp4"),
                StandardOpenOption.READ);
    
        FileChannel outChannel = FileChannel.open(
                Paths.get("/Users/gxc/tmp/sl4.mp4"),
                StandardOpenOption.READ,
                StandardOpenOption.WRITE,
                StandardOpenOption.CREATE);
    
        //inChannel.transferTo(0, inChannel.size(), outChannel);
        outChannel.transferFrom(inChannel, 0, inChannel.size());
    
        inChannel.close();
        outChannel.close();
    }
    

    2.7:分散(Scatter)与聚集(Gather)

    分散读取(Scattering Reads):将通道中的数据分散到多个缓冲区中
    
    聚集写入(Gathering Writes):将多个缓冲区中的数据聚集到通道中
    
    @Test
    public void testUseChannelToCopyFile4() throws Exception {
        RandomAccessFile raf = new RandomAccessFile("1.txt", "rw");
    
        // 1:获取通道
        FileChannel channel = raf.getChannel();
    
        // 2:分配指定大小的缓冲区
        ByteBuffer buf1 = ByteBuffer.allocate(100);
        ByteBuffer buf2 = ByteBuffer.allocate(1024);
    
        // 3:分散读取
        ByteBuffer[] bufs = {buf1, buf2};
        channel.read(bufs);
    
        for (ByteBuffer buf : bufs) {
            buf.flip();
        }
        System.out.println(
        new String(bufs[0].array(), 0, bufs[0].limit()));
        System.out.println("----------------------");
        System.out.println(
        new String(bufs[1].array(), 0, bufs[1].limit()));
    
        // 4:聚集写入
        RandomAccessFile raf2 = new RandomAccessFile("1-1.txt", "rw");
        FileChannel channel2 = raf2.getChannel();
    
        channel2.write(bufs);
    }
    

    2.8:字符集 Charset

    编码:字符串 -> 字节数组
    
    解码:字节数组 -> 字符串
    
    @Test
    public void test() throws CharacterCodingException {
        Charset cs1 = StandardCharsets.UTF_8;
    
        // 获取编码器和解码器
        CharsetEncoder ce = cs1.newEncoder();
        CharsetDecoder cd = cs1.newDecoder();
    
        CharBuffer cb = CharBuffer.allocate(1024);
        cb.put("GongXincheng测试");
        cb.flip();
    
        // 编码
        ByteBuffer byteBuffer = ce.encode(cb);
        for (int i = 0; i < byteBuffer.limit(); i++) {
            System.out.println(byteBuffer.get());
        }
    
        byteBuffer.flip();
    
        // 解码
        System.out.println("------------");
        CharBuffer charBuffer = cd.decode(byteBuffer);
        System.out.println(charBuffer.toString());
    }
    

    3:Selector 选择器

    是 SelectableChannel 的多路复用器。用于监控Selectable的 IO 状况
    
    java.nio.channels.Channel 接口
        |-- FileChanel
        |-- SelectableChannel
            |-- SocketChanel        
            |-- ServerSockerChanel
            |-- DatagramChanel
    
            |-- Pipe.SinkChannel
            |-- Pipe.SourceChannel
    

    3.1:阻塞式NIO网络通讯(传输图片)

    /**
    * 客户端
    *   1:先创建Socket通道,并设置ip和端口号
    *   2:分配指定大小的缓冲区
    *   3:创建本地文件的通道,循环读取数据到缓冲区中
    *   4:将缓冲区的数据写到Socket通道中去
    *   5:关闭资源
    */
    @Test
    public void client() throws Exception {
        // 1:获取通道
        SocketChannel socketChannel = SocketChannel.open(
                new InetSocketAddress("127.0.0.1", 10000));
    
        // 2:分配指定大小的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
    
        // 3:读取本地文件, 并发送到服务端
        FileChannel inChannel = FileChannel.open(
            Paths.get("1.png"), StandardOpenOption.READ);
        while (inChannel.read(buffer) != -1) {
            // 切换成读数据模式
            buffer.flip();
            socketChannel.write(buffer);
            buffer.clear();
        }
    
        // 关闭通道
        inChannel.close();
        socketChannel.close();
    }
    
    
    /**
     * 服务端
     *  1:获取服务端Socket通道,并绑定ip
     *  2:获取客户端连接的通道
     *  3:创建文件通道,并将文件保存到本地
     *  4:关闭资源
     */
    @Test
    public void server() throws Exception {
        // 1:获取服务端Socket通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        // 2:服务端绑定端口号
        ssChannel.bind(new InetSocketAddress(10000));
    
        // 3:获取客户端连接的通道
        SocketChannel socketChannel = ssChannel.accept();
    
        // 4:接收客户端的数据,并保存到本地
        FileChannel fileChannel = FileChannel.open(
                Paths.get("2.png"),
                StandardOpenOption.WRITE,
                StandardOpenOption.CREATE);
                
        // 分配指定大小的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while (socketChannel.read(buffer) != -1) {
            buffer.flip();
            fileChannel.write(buffer);
            buffer.clear();
        }
    
        fileChannel.close();
        socketChannel.close();
        ssChannel.close();
    }
    

    3.2:阻塞式NIO网络通讯(服务端相应客户端内容)

    /**
     * 客户端
     */
    @Test
    public void client() throws Exception {
        SocketChannel socketChannel = SocketChannel.open(
                new InetSocketAddress("127.0.0.1", 10000));
        FileChannel fileChannel = FileChannel.open(
                Paths.get("1.png"), StandardOpenOption.READ);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while (fileChannel.read(buffer) != -1) {
            buffer.flip();
            socketChannel.write(buffer);
            buffer.clear();
        }
    
        // !告诉服务端,客户端已经发完数据,否则服务端会一直处于阻塞状态!
        socketChannel.shutdownOutput();
    
        // 接收服务端反馈
        while (socketChannel.read(buffer) != -1) {
            buffer.flip();
            System.out.println(
                new String(buffer.array(), 0, buffer.limit()));
            buffer.clear();
        }
    
        fileChannel.close();
        socketChannel.close();
    }
    
    /**
     * 服务端
     */
    @Test
    public void server() throws IOException {
        ServerSocketChannel serverSocketChannel =
            ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(10000));
    
        SocketChannel socketChannel = serverSocketChannel.accept();
        FileChannel fileChannel = FileChannel.open(
            Paths.get("3.png"), 
            StandardOpenOption.WRITE, 
            StandardOpenOption.CREATE);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        while (socketChannel.read(buffer) != -1) {
            buffer.flip();
            fileChannel.write(buffer);
            buffer.clear();
        }
    
        // 发送反馈给客户端
        buffer.put("服务端接收客户端数据成功".getBytes());
        buffer.flip();
        socketChannel.write(buffer);
    
        fileChannel.close();
        socketChannel.close();
        serverSocketChannel.close();
    }
    

    3.3:非阻塞式NIO网络通讯

    /**
    * 客户端
    */
    @Test
    public void client() throws Exception {
        // 1:获取Socket通道
        SocketChannel sChannel = SocketChannel.open(
                new InetSocketAddress("127.0.0.1", 10000));
    
        // 2:切换成非阻塞模式
        sChannel.configureBlocking(false);
    
        // 3:分配指定大小的缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
    
        // 4:发送数据给服务端
        buffer.put((LocalDateTime.now().toString() 
            + "\n" + "Hello World").getBytes());
        buffer.flip();
        sChannel.write(buffer);
        buffer.clear();
        
        // 5:关闭通道
        sChannel.close();
    }
    
    /**
     * 服务端
     */
    @Test
    public void server() throws Exception {
        // 1:获取通道
        ServerSocketChannel serverSocketChannel = 
            ServerSocketChannel.open();
        // 2:切换到非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 3:绑定连接
        serverSocketChannel.bind(new InetSocketAddress(10000));
        // 4:获取选择器
        Selector selector = Selector.open();
        // 5:将通道注册到选择器上,并且指定"监听接收事件"
        serverSocketChannel.register(selector,
            SelectionKey.OP_ACCEPT);
        // 6:轮询式的获取选择器上已经"准备就绪"的事件
        while (selector.select() > 0) {
            // 7:获取当前selector中所有注册的"选择键(已就绪的监听事件)"
            Iterator<SelectionKey> iterator =
                selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                // 8:获取准备"就绪"的事件
                SelectionKey selectionKey = iterator.next();
                // 9:判断具体是哪种事件类型
                if(selectionKey.isAcceptable()) {
                    // 10:若接收事件"就绪",获取客户端连接
                    SocketChannel sChannel =
                        serverSocketChannel.accept();
                    // 11:将客户端通道切换成非阻塞模式
                    sChannel.configureBlocking(false);
                    // 12:将该 客户端通道 注册到Selector上,监听"读就绪"状态
                    sChannel.register(selector, 
                        SelectionKey.OP_READ);
                } else if (selectionKey.isReadable()) {
                    // 13:获取当前Selector上"读就绪"的通道
                    SocketChannel socketChannel = 
                        (SocketChannel) selectionKey.channel();
                    // 14:读取数据
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    while (socketChannel.read(buffer) != -1) {
                        buffer.flip();
                        System.out.println(
                            new String(buffer.array(), 0,
                                buffer.limit()));
                        buffer.clear();
                    }
                }
                // 15:取消选择键 SelectionKey
                iterator.remove();
            }
        }
    }
    
    非阻塞服务端代码

    3.4:非阻塞式NIO网络通讯(UDP)

    /**
     * 发送端.
     */
    @Test
    public void send() throws Exception {
        DatagramChannel datagramChannel = DatagramChannel.open();
        datagramChannel.configureBlocking(false);
    
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        Scanner sc = new Scanner(System.in);
        while (sc.hasNext()) {
            String message = sc.next();
            buffer.put(message.getBytes());
    
            buffer.flip();
            datagramChannel.send(buffer,
                    new InetSocketAddress("127.0.0.1", 10001));
            buffer.clear();
        }
    
        sc.close();
        datagramChannel.close();
    }
    
    /**
     * 接收端
     */
    @Test
    public void receive() throws Exception {
        DatagramChannel datagramChannel = DatagramChannel.open();
        datagramChannel.configureBlocking(false);
        datagramChannel.bind(new InetSocketAddress(10001));
        
        Selector selector = Selector.open();
        datagramChannel.register(selector, SelectionKey.OP_READ);
    
        while (selector.select() > 0) {
            Set<SelectionKey> skSet = selector.selectedKeys();
            Iterator<SelectionKey> iterator = skSet.iterator();
            while (iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                if (sk.isReadable()) {
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    datagramChannel.receive(buffer);
                    buffer.flip();
                    System.out.println(new String(
                            buffer.array(), 0, buffer.limit()));
                    buffer.clear();
                }
                iterator.remove();
            }
        }
    }
    

    3.5:Pipe管道

    @Test
    public void testPipe() throws Exception {
        // 1:获取管道
        Pipe pipe = Pipe.open();
    
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put("通过单向管道发送数据".getBytes());
        buffer.flip();
    
        // 2:将缓冲区的数据接入管道
        Pipe.SinkChannel sinkChannel = pipe.sink();
        sinkChannel.write(buffer);
    
        // 3:读取缓冲区中的数据
        buffer.flip();
        Pipe.SourceChannel sourceChannel = pipe.source();
        sourceChannel.read(buffer);
        System.out.println(
            new String(buffer.array(), 0, buffer.limit()));
    }
    

    相关文章

      网友评论

          本文标题:NIO入门(Buffer & Channel & Selecto

          本文链接:https://www.haomeiwen.com/subject/hagmgctx.html