美文网首页IO
【NIO】NIO版本的鸿儒聊天室

【NIO】NIO版本的鸿儒聊天室

作者: 垃圾简书_吃枣药丸 | 来源:发表于2020-07-09 18:43 被阅读0次

    # 需求

    • 基于NIO实现
    • 支持同时多个客户端接入
    • 支持客户端发送文本消息到服务器
    • 支持客户端自定义群聊名称
    • 接收到客户端发送的消息之后,服务器需要将消息转发给目前在线的所有其他客户端
    • 支持客户端退出群聊
    • 服务端停止服务后,客户端自动断开连接

    # 技术介绍

    • Non-blockingI/O 编程模型
    • Channel 通道
      • ServerSocketChannel 服务端通道
      • SocketChannel 客户端通道
    • ByteBuffer NIO中使用的读写缓冲区
    • Selector 多路复用器
      • channel注册在多路复用器上,并监听相应的事件
    • 多线程
    • 线程池

    # 代码

    温馨提示:注意看代码注释哟~ 跟上节奏,很简单😼

    • 服务器
    
    /**
     * 基于NIO实现的聊天室服务端
     *
     * @author futao
     * @date 2020/7/8
     */
    @Slf4j
    public class NioChatServer {
    
        /**
         * 用于处理通道上的事件的线程池(可选的)
         */
        private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);
    
        /**
         * 启动聊天室
         */
        public void start() {
            try {
                //服务端Socket通道
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                //将通道设置成非阻塞
                serverSocketChannel.configureBlocking(false);
                //绑定主机与监听端口
                serverSocketChannel.bind(new InetSocketAddress("localhost", Constants.SERVER_PORT));
    
                //多路复用器
                Selector selector = Selector.open();
    
                //将服务端通道注册到多路复用器上,并设置监听事件接入事件
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
                log.debug("{} 基于NIO的聊天室在[{}]端口启动成功 {}", StringUtils.repeat("=", 30), Constants.SERVER_PORT, StringUtils.repeat("=", 30));
    
                while (true) {
                    // 触发了事件的通道数量,该方法会阻塞
                    int eventCountTriggered = selector.select();
                    if (eventCountTriggered <= 0) {
                        continue;
                    }
                    // 获取到所有触发的事件
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    // 遍历事件进行处理
                    for (SelectionKey selectionKey : selectionKeys) {
                        // 处理事件
                        selectionKeyHandler(selectionKey, selector);
                    }
                    // 清除事件记录
                    selectionKeys.clear();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 事件处理器
         *
         * @param selectionKey 触发的事件信息
         * @param selector     多路复用器
         */
        private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {
            if (selectionKey.isAcceptable()) {
                //如果触发的是SocketChannel接入事件
                try {
                    // ServerSocketChannel上触发的客户端SocketChannel接入
                    SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
                    log.debug("客户端[{}]成功接入聊天服务器", socketChannel.socket().getPort());
                    // 将客户端SocketChannel通道设置成非阻塞
                    socketChannel.configureBlocking(false);
                    // 将客户端通道注册到多路复用器,并监听这个通道上发生的可读事件
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            } else if (selectionKey.isReadable()) {
                // 触发的是可读事件
                // 获取到可读事件的客户端通道
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                //创建缓冲区
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
                try {
                    // 读取通道上的数据写入缓冲区(返回0或者-1说明读到了末尾)
                    while (socketChannel.read(byteBuffer) > 0) {
                    }
                    //切换为读模式
                    byteBuffer.flip();
                    // 接收到的消息
                    String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
                    log.info("接收到来自客户端[{}]的数据:[{}]", socketChannel.socket().getPort(), message);
                    // 是否退出
                    quit(message, selector, selectionKey);
                    // 消息转发
                    forwardMessage(message, selector, selectionKey);
                    // 清除缓冲区的数据
                    byteBuffer.clear();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 客户端退出
         *
         * @param message      消息
         * @param selector     多路复用器
         * @param selectionKey 触发的selectionKey
         */
        public void quit(String message, Selector selector, SelectionKey selectionKey) {
            if (StringUtils.isBlank(message) || Constants.KEY_WORD_QUIT.equals(message)) {
                int port = ((SocketChannel) selectionKey.channel()).socket().getPort();
                // 客户端下线
                selectionKey.cancel();
                log.debug("客户端[{}]下线", port);
                // 因为发生了监听事件和channel的变更,所以需要通知selector重新整理selector所监听的事件
                selector.wakeup();
            }
        }
    
        /**
         * 转发消息
         *
         * @param message         需要转发的消息
         * @param selector        多路复用器
         * @param curSelectionKey 当前触发的selectionKey
         */
        public void forwardMessage(String message, Selector selector, SelectionKey curSelectionKey) {
            // 创建缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
            // 数据写入缓冲区
            byteBuffer.put(message.getBytes(Constants.CHARSET));
    
            // 切换为读模式
            byteBuffer.flip();
            // 在首尾进行标记,因为需要给每个客户端发送同样的数据,需要重复读取
            byteBuffer.mark();
            // 当前注册在多路复用器上的SelectionKey集合
            Set<SelectionKey> keys = selector.keys();
            for (SelectionKey key : keys) {
                // 消息不能转发给自己 and 只转发给客户端SocketChannel
                if (curSelectionKey.equals(key) || !(key.channel() instanceof SocketChannel)) {
                    continue;
                }
                // 客户端SocketChannel
                SocketChannel socketChannel = (SocketChannel) key.channel();
                // 如果缓冲区中还有数据就一直写
                while (byteBuffer.hasRemaining()) {
                    try {
                        // 数据写入通道
                        socketChannel.write(byteBuffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                // 重置到上次mark的地方,即首位
                byteBuffer.reset();
            }
            // 清除缓冲区的数据
            byteBuffer.clear();
        }
    
    
        public static void main(String[] args) {
            new NioChatServer().start();
        }
    }
    
    • 客户端
    
    /**
     * 基于NIO实现的群聊客户端
     *
     * @author futao
     * @date 2020/7/8
     */
    @Getter
    @Setter
    @Slf4j
    public class NioChatClient {
    
        /**
         * 用于处理用户输入数据的单个线程线程池,使用线程池是为了便于关闭
         */
        private static final ExecutorService USER_INPUT_HANDLER = Executors.newSingleThreadExecutor();
    
        /**
         * 用户名
         */
        private String userName;
    
        /**
         * 启动客户端
         */
        public void start() {
            try {
                // 创建客户端通道
                SocketChannel socketChannel = SocketChannel.open();
                // 将通道设置为非阻塞
                socketChannel.configureBlocking(false);
    
                // 创建多路复用器
                Selector selector = Selector.open();
    
                // 将客户端通道注册到多路复用器,并监听可读事件
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
    
                // 尝试连接到聊天服务器
                socketChannel.connect(new InetSocketAddress("localhost", Constants.SERVER_PORT));
    
                while (true) {
                    // 阻塞等待通道上的事件触发。返回触发的通道的数量
                    int eventCountTriggered = selector.select();
                    if (eventCountTriggered <= 0) {
                        continue;
                    }
                    // 获取到所有触发的事件
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    // 遍历事件进行处理
                    for (SelectionKey selectionKey : selectionKeys) {
                        // 处理事件
                        selectionKeyHandler(selectionKey, selector);
                    }
                    // 清除事件记录
                    selectionKeys.clear();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (ClosedSelectorException e) {
                log.debug("成功退出聊天室...");
            }
        }
    
        /**
         * 处理器
         *
         * @param selectionKey 触发的selectionKey
         * @param selector     多路复用器
         */
        private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {
    
            if (selectionKey.isConnectable()) {
                // 触发的是成功接入服务器的事件
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                try {
                    // 判断此通道上的连接操作是否正在进行中
                    if (socketChannel.isConnectionPending()) {
                        // 完成连接套接字通道的过程
                        socketChannel.finishConnect();
                        log.debug("成功接入聊天服务器");
    
                        // 将通道设置成非阻塞
                        socketChannel.configureBlocking(false);
                        // 将通道注册到多路复用器,并监听可读事件
                        socketChannel.register(selector, SelectionKey.OP_READ);
    
                        // 创建缓冲区,用于处理将用户输入的数据写入通道
                        ByteBuffer byteBuffer = ByteBuffer.allocate(4 * 1024);
                        // 在新线程中处理用户输入
                        USER_INPUT_HANDLER.execute(() -> {
                            while (!Thread.currentThread().isInterrupted()) {
                                //先清空缓冲区中的数据
                                byteBuffer.clear();
                                // 获取用户输入的文本
                                String message = new Scanner(System.in).nextLine();
                                // 将数据写入缓冲区
                                byteBuffer.put(String.format("【%s】: %s", userName, message).getBytes(Constants.CHARSET));
                                // 将缓冲区设置为读模式
                                byteBuffer.flip();
                                try {
                                    // 当缓冲区中还有数据
                                    while (byteBuffer.hasRemaining()) {
                                        // 将数据写入通道
                                        socketChannel.write(byteBuffer);
                                    }
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
    
                                // 判断是否退出群聊
                                if (quit(message, selector, selectionKey)) {
                                    // 跳出循环,结束线程
                                    break;
                                }
                            }
                            try {
                                // 关闭多路复用器
                                selector.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            // 关闭线程池
                            USER_INPUT_HANDLER.shutdown();
                        });
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            } else if (selectionKey.isReadable()) {
                // 触发的是可读事件
                // 获取到可读事件的通道
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                //创建缓冲区
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
                try {
                    // 将通道上的数据写入缓冲区(返回0或者-1说明读到了末尾)
                    while (socketChannel.read(byteBuffer) > 0) {
                    }
                    // 切换成读模式
                    byteBuffer.flip();
                    String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
                    byteBuffer.clear();
                    log.info("接收到数据:[{}]", message);
                    if (StringUtils.isBlank(message)) {
                        log.debug("服务器拉胯,下车...");
                        selector.close();
                        USER_INPUT_HANDLER.shutdownNow();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 退出群聊
         *
         * @param message      消息
         * @param selector     多路复用器
         * @param selectionKey 触发的selectionKey
         * @return 是否退出
         */
        public boolean quit(String message, Selector selector, SelectionKey selectionKey) {
            if (Constants.KEY_WORD_QUIT.equals(message)) {
                selectionKey.cancel();
                selector.wakeup();
                return true;
            }
            return false;
        }
    
    
        public static void main(String[] args) {
            NioChatClient nioChatClient = new NioChatClient();
            nioChatClient.setUserName("小9");
            nioChatClient.start();
        }
    }
    

    # 测试

    • 接入
    image.png
    • 客户端发送消息
    image.png
    • 消息转发
    image.png
    image.png
    image.png
    image.png
    • 客户端下线
    image.png
    image.png
    • 服务器宕机
    image.png

    # 源代码

    * https://github.com/FutaoSmile/learn-IO/tree/master/practice/src/main/java/com/futao/practice/chatroom/nio
    

    # 系列文章

    欢迎在评论区留下你看文章时的思考,及时说出,有助于加深记忆和理解,还能和像你一样也喜欢这个话题的读者相遇~

    相关文章

      网友评论

        本文标题:【NIO】NIO版本的鸿儒聊天室

        本文链接:https://www.haomeiwen.com/subject/crdscktx.html