Java NIO

作者: 三丶斤 | 来源:发表于2019-07-08 23:01 被阅读0次

    每一种技术的出现,都是为了解决某一个或者某一类问题。让我们先来了解问题的产生。

    问题
    使用socket通信实现如下:
    1.client连接server
    2.client发送"Hi Server,I am client."
    3.server收到消息在控制台的打印,并回复"Hi client,I am Server."
    4.client收到消息在控制台打印。
    5.client断开连接。

    1.Simple Solution(方式一)

    直接贴代码了

    /**
     * @description: SimpleSolution server 
     * @author: sanjin
     * @date: 2019/7/8 11:33
     */
    public class Server {
        public static void main(String[] args) {
            // 服务端占用端口
            int port = 8000;
            // 创建 serversocker
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(port);
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (serverSocket != null) {
                while (true) {
                    InputStream is = null;
                    OutputStream os = null;
                    Socket client = null;
                    try {
                        // accept()方法会阻塞,直到有client连接后才会执行后面的代码
                        client = serverSocket.accept();
                        is = client.getInputStream();
                        os = client.getOutputStream();
    
                        // 3.server收到消息在控制台的打印,并回复"Hi client,I am Server."
                        byte[] buffer = new byte[5];
                        int len = 0;
                        // 使用ByteArrayOutputStream,避免缓冲区过小导致中文乱码
                        ByteArrayOutputStream baos = new ByteArrayOutputStream();
                        while ((len = is.read(buffer)) != -1) {
                            baos.write(buffer,0,len);
                        }
                        System.out.println(baos.toString());
                        // 服务端回复客户端消息
                        os.write("Hi client,I am Server.".getBytes());
                        os.flush(); // 刷新缓存,避免消息没有发送出去
                        client.shutdownOutput();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        // 程序异常或者执行完成,关闭流,防止占用资源
                        if (client != null) {
                            try {
                                client.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        if (is != null) {
                            try {
                                is.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        if (os != null) {
                            try {
                                os.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
    
                }
            }
    
    
        }
    }
    
    
    /**
     * @description: SimpleSolution client
     * @author: sanjin
     * @date: 2019/7/8 11:33
     */
    public class Client {
        public static void main(String[] args) {
            int port = 8000;
            Socket client = null;
            InputStream is = null;
            OutputStream os = null;
            try {
                // 1.client连接server
                client = new Socket("localhost", port);
                is = client.getInputStream();
                os = client.getOutputStream();
    
                // 2.client发送"Hi Server,I am client."
                os.write("Hi Server,I am client.".getBytes());
                os.flush();
                // 调用shutdownOutput()方法表示客户端传输完了数据,否则服务端的
                // read()方法会一直阻塞
                // (你可能会问我这不是写了 read()!=-1, -1表示的文本文件的结尾字符串,而对于字节流数据,
                // 是没有 -1 标识的,这就会使服务端无法判断客户端是否发送完成,导致read()方法一直阻塞)
                client.shutdownOutput();
    
                // 4.client收到消息在控制台打印。
                int len = 0;
                byte[] buffer = new byte[5];
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                while ((len = is.read(buffer)) != -1) {
                    baos.write(buffer,0,len);
                }
                System.out.println(baos.toString());
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                // 程序异常或者执行完成,关闭流,防止占用资源
                try {
                    if (is != null) {
                        is.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                try {
                    if (os != null) {
                        os.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                try {
                    if (client != null) {
                        client.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
    
        }
    }
    
    

    程序描述图:


    1.png

    顺便说一下,ProcessOn真的很好用😄

    关于Socket编程,有几个注意点:

    1. 注意使用流时一定要用try-catch-finally,虽然代码确实有点繁琐。
      2.客户端如果发送的使中文,在服务端接收数据时候,要注意接收方式:
    // 接收数据方式一
    byte[] buffer = new byte[5];
    int len = 0;
    // 使用ByteArrayOutputStream,避免缓冲区过小导致中文乱码
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    while ((len = is.read(buffer)) != -1) {
        baos.write(buffer, 0, len);
    }
    System.out.println(baos.toString());
    
    // 接收数据方式一
    byte[] buffer = new byte[5];
    int len = 0;
    // 使用ByteArrayOutputStream,避免缓冲区过小导致中文乱码
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    while ((len = is.read(buffer)) != -1) {
        // 这种方式会导致中文乱码
        System.out.println(new String(buffer, 0, len));
    }
    

    如果客户端传输中文使用方式二会导致中文乱码,这是因为我们在读取时候缓冲区大小设置的是5个字节,此处假设客户端传输“小白兔“三个字,常用的汉字一般占3个字节。”小白兔“发送过来后我们的缓冲区只有5个字节,没办法一次读取完,所以要分二次读取,第一次读取5个字节,然后立即进行了打印,汉字”小“会被正常打印,但是汉字”白“只读取了2个字节,打印就会产生乱码。而使用ByteArrayOutputStream把缓冲区读取的字节全都存放一起,然后一起打印,就不会导致乱码了。
    3.shutdownOutput()方法。当客户端传输”Hi Server,I am client.“,服务端接收数据并打印出来,然后向客户端发送”"Hi client,I am Server."。如果不使用shutdownOutput()方法会使服务端卡在read()方法。这是因为当客户端数据发送完成后,服务端的判断条件
    while ((len = is.read(buffer)) != -1)
    不成立,因为只有文本文件的末尾是 -1,而字节流没有末尾标识,这就导致服务端不知道客户端有没有发送完成,使得read()方法阻塞。所以客户端发送完数据后需要发送一个标识来表示”我已经发送完数据了“。而shutdownOutput()方法就是这个标识。

    我们使用socket完成了一个收发的程序。但是它还存在着问题。

    1. 不能同时有多个client连接我们的server
    服务端与客户端连接使用依靠accept()函数,而我们的服务端程序是单线程,只能等当前的socket执行完成后,才能接收下一个socket的连接。

    假设我们同时又2个client连接server会发生什么?(因为我们程序简单,执行的很快,所以我在server种加了Thread.sleep(50*1000))
    现象:第二个client会抛出异常:


    1.png

    下面我们就用多线程解决这个问题。


    2.Multithreading Solution(方式二)

    我又新加了一个HandlerClient类,实现Runnable接口,用于处理client连接,Client类的代码没有做修改。

    
    /**
     * @description: MultithreadingSolution client
     * @author: sanjin
     * @date: 2019/7/8 11:33
     */
    public class Client {
        public static void main(String[] args) {
            int port = 8000;
            Socket client = null;
            InputStream is = null;
            OutputStream os = null;
            try {
                // 1.client连接server
                client = new Socket("localhost", port);
                is = client.getInputStream();
                os = client.getOutputStream();
    
                // 2.client发送"Hi Server,I am client."
                os.write("Hi Server,I am client.".getBytes());
                os.flush();
                // 调用shutdownOutput()方法表示客户端传输完了数据,否则服务端的
                // read()方法会一直阻塞
                // (你可能会问我这不是写了 read()!=-1, -1表示的文本文件的结尾字符串,而对于字节流数据,
                // 是没有 -1 标识的,这就会使服务端无法判断客户端是否发送完成,导致read()方法一直阻塞)
                client.shutdownOutput();
    
                // 4.client收到消息在控制台打印。
                int len = 0;
                byte[] buffer = new byte[5];
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                while ((len = is.read(buffer)) != -1) {
                    baos.write(buffer,0,len);
                }
                System.out.println(baos.toString());
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                // 程序异常或者执行完成,关闭流,防止占用资源
                try {
                    if (is != null) {
                        is.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                try {
                    if (os != null) {
                        os.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                try {
                    if (client != null) {
                        client.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
    
        }
    }
    
    /**
     * @description: MultithreadingSolution server
     * @author: sanjin
     * @date: 2019/7/8 11:33
     */
    public class Server {
        public static void main(String[] args) {
            // 服务端占用端口
            int port = 8000;
            // 创建 serversocker
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(port);
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (serverSocket != null) {
                while (true) {
                    try {
                        Socket client = serverSocket.accept();
                        System.out.println("收到client连接,client地址:"+client.getInetAddress());
                        new Thread(new HandlerClient(client)).start();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    /**
     * @description: 用于处理client连接
     * @author: sanjin
     * @date: 2019/7/8 16:28
     */
    public class HandlerClient implements Runnable {
        private Socket client;
    
        public HandlerClient(Socket client) {
            this.client = client;
        }
    
        @Override
        public void run() {
            InputStream is = null;
            OutputStream os = null;
            try {
                is = client.getInputStream();
                os = client.getOutputStream();
    
                // 3.server收到消息在控制台的打印,并回复"Hi client,I am Server."
                byte[] buffer = new byte[5];
                int len = 0;
                // 使用ByteArrayOutputStream,避免缓冲区过小导致中文乱码
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                while ((len = is.read(buffer)) != -1) {
                    // 这种方式会导致中文乱码
                    // System.out.println(new String(buffer, 0, len));
                    baos.write(buffer, 0, len);
                }
                System.out.println(baos.toString());
    
                try {
                    // 增加任务执行时间,用于进行多个client连接测试
                    Thread.sleep(20*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                // 服务端回复客户端消息
                os.write("Hi client,I am Server.".getBytes());
                os.flush(); // 刷新缓存,避免消息没有发送出去
                client.shutdownOutput();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                // 程序异常或者执行完成,关闭流,防止占用资源
                if (client != null) {
                    try {
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (is != null) {
                    try {
                        is.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (os != null) {
                    try {
                        os.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    }
    

    运行结果:


    1.png

    问题:计算机的CPU资源有限,来一个client就会创建一个线程,线程完成任务后再进行销毁,线程的创建、销毁以及线程上下文的切换会消耗很多CPU的资源。并且JVM中线程数过多还有可能抛出内存不足的异常。

    所以我们下一步使用线程池来解决这个问题。

    程序描述图:


    1.png

    3.Thread Pool Solution(方式三)

    线程池解决方法思路:


    1.png

    我们再方式二已经完成了多线程方式代码,将它修改成线程池方式非常简单,我们只需要修改Server类就可以了:

    
    /**
     * @description: ThreadPoolSolution server
     * @author: sanjin
     * @date: 2019/7/8 11:33
     */
    public class Server {
    
        // 创建线程池
        private static ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(
                        5, // 核心线程数
                        10, // 最大线程数
                        200, // keep alive 时间
                        TimeUnit.HOURS, // keep alive 时间单位
                        new ArrayBlockingQueue<Runnable>(5) // 工作队列
                );
    
        public static void main(String[] args) {
            // 服务端占用端口
            int port = 8000;
            // 创建 serversocker
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(port);
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (serverSocket != null) {
                while (true) {
                    try {
                        Socket client = serverSocket.accept();
                        System.out.println("收到client连接,client地址:"+client.getInetAddress());
    
                        // 多线程方式
                        // new Thread(new HandlerClient(client)).start();
    
                        // 线程池方式
                        threadPoolExecutor.execute(new HandlerClient(client));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

    不知道大家晕了没,我已经快不行了,但还是要明白我们使用多线程的目的:
    解决多个client同时连接的问题
    好了,下面主角登场。

    4.NIO(方式三)

    关于JavaNIO有一个非常好的英文资料:http://tutorials.jenkov.com/java-nio/index.html

    
    /**
     * @description:
     * @author: sanjin
     * @date: 2019/7/8 19:56
     */
    public class NIOClient {
    
        public static void main(String[] args) {
            SocketAddress socketAddress = new InetSocketAddress(8000);
            SocketChannel socketChannel = null;
            try {
                socketChannel = SocketChannel.open(socketAddress);
                socketChannel.configureBlocking(false);
                if (socketChannel.finishConnect()) {
    
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    // 客户端发送数据 "Hi Server,I am client."
                    buffer.clear();
                    buffer.put("Hi Server,I am client.".getBytes());
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        socketChannel.write(buffer);
                    }
                    // 客户端接收服务端数据打印在控制台
    
                    buffer.clear();
                    int len = socketChannel.read(buffer);
                    while (len > 0) {
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            System.out.print((char) buffer.get());
                        }
                        System.out.println();
                        buffer.clear();
                        len = socketChannel.read(buffer);
                    }
                    if (len == -1) {
                        socketChannel.close();
                    }
                }
    
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (socketChannel != null) {
                        socketChannel.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    
    /**
     * @description:
     * @author: sanjin
     * @date: 2019/7/8 19:56
     */
    public class NIOServer {
    
    
        public static void main(String[] args) {
            ServerSocketChannel serverSocketChannel = null;
            Selector selector = null;
            try {
                // 初始化一个 serverSocketChannel
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.bind(new InetSocketAddress(8000));
    
                // 设置serverSocketChannel为非阻塞模式
                // 即 select()会立即得到返回
                serverSocketChannel.configureBlocking(false);
    
                // 初始化一个 selector
                selector = Selector.open();
    
                // 将 serverSocketChannel 与 selector绑定
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
                while (true) {
                    // 通过操作系统监听变化的socket个数
                    // 在windows平台通过selector监听(轮询所有的socket进行判断,效率低)
                    // 在Linux2.6之后通过epool监听(事件驱动方式,效率高)
                    int count = selector.select(3000);
                    if (count > 0) {
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        while (iterator.hasNext()) {
                            SelectionKey key = iterator.next();
    
                            if (key.isAcceptable()) {
                                handleAccept(key);
                            }
                            if (key.isReadable()) {
                                handleRead(key);
                            }
                            if (key.isWritable() && key.isValid()) {
                                handleWrite(key);
                            }
                            if (key.isConnectable()) {
                                System.out.println("isConnectable = true");
                            }
                            iterator.remove();
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (serverSocketChannel != null) {
                        serverSocketChannel.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                try {
                    if (selector != null) {
                        selector.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private static void handleWrite(SelectionKey key) {
            // 获取 client 的 socket
            SocketChannel clientChannel = (SocketChannel) key.channel();
            // 获取缓冲区
            ByteBuffer buffer = (ByteBuffer) key.attachment();
            buffer.clear();
            buffer.put("Hi client,I am Server.".getBytes());
            buffer.flip();
            try {
                while (buffer.hasRemaining()) {
                    clientChannel.write(buffer);
                }
                buffer.compact();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private static void handleRead(SelectionKey key) {
            // 获取 readable 的客户端 socketChannel
            SocketChannel clientChannel = (SocketChannel) key.channel();
            // 读取客户端发送的消息信息,我们已经在 acceptable 中设置了缓冲区
            // 所以直接冲缓冲区读取信息
            ByteBuffer buffer = (ByteBuffer) key.attachment();
    
            // 获取 client 发送的消息
            try {
                int len = clientChannel.read(buffer);
                while (len > 0) {
                    // 设置 limit 位置
                    buffer.flip();
                    // 开始读取数据
                    while (buffer.hasRemaining()) {
                        byte b = buffer.get();
                        System.out.print((char) b);
                    }
                    System.out.println();
                    // 清除 position 位置
                    buffer.clear();
                    // 从新读取 len
                    len = clientChannel.read(buffer);
                }
                if (len == -1) {
                    clientChannel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
    
    
    
        }
    
        private static void handleAccept(SelectionKey key) {
            // 获得 serverSocketChannel
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
            try {
                // 获得 socketChannel,就是client的socket
                SocketChannel clientChannel = serverSocketChannel.accept();
                if (clientChannel == null) return;
                // 设置 socketChannel 为无阻塞模式
                clientChannel.configureBlocking(false);
                // 将其注册到 selector 中,设置监听其是否可读,并分配缓冲区
                clientChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(512));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:Java NIO

          本文链接:https://www.haomeiwen.com/subject/dpqohctx.html