美文网首页IO
【BIO】通过指定消息大小实现的多人聊天室-终极版本

【BIO】通过指定消息大小实现的多人聊天室-终极版本

作者: 垃圾简书_吃枣药丸 | 来源:发表于2020-07-06 15:23 被阅读0次
    # 前情提要:
    • 在上一篇文章BIO在聊天室项目中的演化中提到,告知对方消息已经发送完毕的方式有4种
      1. 关闭Socket连接
      2. 关闭输出流,socket.shutdownOutput();
      3. 使用标志符号,借助字符流,Reader.readLine(),该方法会在读取到\r,\n或者\r\n时返回所读取到的内容。
      4. 通过指定本次发送的数据的字节大小。告知对方从输入流中读取指定大小的字节。

    本文使用第四种方案来实现聊天室

    • 思路为:
      • 客户端在发送消息之前,先计算出本次发送的数据量的字节大小,比如为N个字节。那么在向服务器发送数据的前,先约定好流中的前1个字节(或者前X个字节,根据自己项目的实际情况来决定)为本次发送的数据量的大小。
      • 客户端发送消息,先将计算出的字节大小N写入输出流,再将实际的内容写入输出流。
      • 服务端在获取到输入流之后,根据约定,先读取前X个字节,根据这个字节的值可以知道,本次发送的数据量的大小,那么在读取数据时,只需要读取后续的N个字节即可。
    • 温馨提示: 注意看代码注释哟~

    # 代码实现

    • 客户端
    /**
     * @author futao
     * @date 2020/7/6
     */
    public class BioChatClient {
    
        private static final Logger logger = LoggerFactory.getLogger(BioChatClient.class);
    
        private static final ExecutorService SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor();
    
        /**
         * 启动客户端
         */
        public void start() {
            try {  //尝试连接到聊天服务器
                Socket socket = new Socket("localhost", Constants.SERVER_PORT);
                logger.debug("========== 成功连接到聊天服务器 ==========");
    
                InputStream inputStream = socket.getInputStream();
                OutputStream outputStream = socket.getOutputStream();
    
                //从输入流中读取数据
                SINGLE_THREAD_EXECUTOR.execute(() -> {
                    try {
                        while (true) {
                            String message = IOUtils.messageReceiver(inputStream);
                            logger.info("接收到服务端消息:[{}]", message);
                        }
                    } catch (IOException e) {
                        logger.error("发生异常", e);
                    }
                });
    
                while (true) {
                    //获取用户输入的数据
                    String message = new Scanner(System.in).nextLine();
                    if (StringUtils.isBlank(message)) {
                        break;
                    }
                    //将内容转换为字节数组
                    byte[] contentBytes = message.getBytes(Constants.CHARSET);
                    //内容字节数组的大小
                    int length = contentBytes.length;
                    //第一个字节写入本次传输的数据量的大小
                    outputStream.write(length);
                    //写入真正需要传输的内容
                    outputStream.write(contentBytes);
                    //刷新缓冲区
                    outputStream.flush();
    
                    if (Constants.KEY_WORD_QUIT.equals(message)) {
                        //客户端退出
                        SINGLE_THREAD_EXECUTOR.shutdownNow();
                        inputStream.close();
                        outputStream.close();
                        socket.close();
                        break;
                    }
                }
            } catch (IOException e) {
                logger.error("发生异常", e);
            }
        }
    
        public static void main(String[] args) {
            new BioChatClient().start();
        }
    }
    
    • 从输入流中读取指定大小的数据
        /**
         * 从输入流中读取指定大小的字节数据并转换成字符串
         *
         * @param inputStream 输入流
         * @return 读取到的字符串
         * @throws IOException
         */
        public static String messageReceiver(InputStream inputStream) throws IOException {
            //本次传输的数据量的大小
            int curMessageLength = inputStream.read();
            byte[] contentBytes = new byte[curMessageLength];
            //读取指定长度的字节
            inputStream.read(contentBytes);
            return new String(contentBytes);
        }
    
    
    • 服务端
    /**
     * @author futao
     * @date 2020/7/6
     */
    public class BioChatServer {
    
        private static final Logger logger = LoggerFactory.getLogger(BioChatServer.class);
    
        /**
         * 可同时接入的客户端数量
         */
        private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);
    
    
        /**
         * 当前接入的客户端
         */
        private static final Set<Socket> CLIENT_SOCKET_SET = new HashSet<Socket>() {
            @Override
            public synchronized boolean add(Socket o) {
                return super.add(o);
            }
    
            @Override
            public synchronized boolean remove(Object o) {
                return super.remove(o);
            }
        };
    
        /**
         * 启动服务端
         */
        public void start() {
            try {
                //启动服务器,监听端口
                ServerSocket serverSocket = new ServerSocket(Constants.SERVER_PORT);
                logger.debug("========== 基于BIO的聊天室在[{}]端口启动成功 ==========", Constants.SERVER_PORT);
                while (true) {
                    //监听客户端接入事件
                    Socket socket = serverSocket.accept();
                    THREAD_POOL.execute(() -> {
                        CLIENT_SOCKET_SET.add(socket);
                        int port = socket.getPort();
                        logger.debug("客户端[{}]成功接入聊天服务器", port);
                        try {
                            InputStream inputStream = socket.getInputStream();
                            OutputStream outputStream = socket.getOutputStream();
    
                            while (true) {
                                //获取到客户端发送的消息
                                String message = IOUtils.messageReceiver(inputStream);
                                logger.info("接收到客户端[{}]发送的消息:[{}]", port, message);
                                //客户端是否退出
                                boolean isQuit = IOUtils.isQuit(message, socket, CLIENT_SOCKET_SET);
                                if (isQuit) {
                                    socket.close();
                                    break;
                                } else {
                                    //消息转发
                                    IOUtils.forwardMessage(port, message, CLIENT_SOCKET_SET);
                                }
                            }
                        } catch (IOException e) {
                            logger.error("发生异常", e);
                        }
                    });
                }
            } catch (IOException e) {
                logger.error("发生异常", e);
            }
        }
    
    
        public static void main(String[] args) {
            new BioChatServer().start();
        }
    }
    
    • 客户端下线与消息转发
    /**
         * 判断客户端是否下线,并且将需要下线的客户端下线
         *
         * @param message         消息
         * @param socket          客户端Socket
         * @param clientSocketSet 当前接入的客户端Socket集合
         * @return 是否退出
         * @throws IOException
         */
        public static boolean isQuit(String message, Socket socket, Set<Socket> clientSocketSet) throws IOException {
            boolean isQuit = StringUtils.isBlank(message) || Constants.KEY_WORD_QUIT.equals(message);
            if (isQuit) {
                clientSocketSet.remove(socket);
                int port = socket.getPort();
                socket.close();
                logger.debug("客户端[{}]下线", port);
            }
            return isQuit;
        }
    
        /**
         * 转发消息
         *
         * @param curSocketPort   当前发送消息的客户端Socket的端口
         * @param message         需要转发的消息
         * @param clientSocketSet 当前接入的客户端Socket集合
         */
        public static void forwardMessage(int curSocketPort, String message, Set<Socket> clientSocketSet) {
            if (StringUtils.isBlank(message)) {
                return;
            }
            for (Socket socket : clientSocketSet) {
                if (socket.isClosed() || socket.getPort() == curSocketPort) {
                    continue;
                }
                if (socket.getPort() != curSocketPort) {
                    try {
                        OutputStream outputStream = socket.getOutputStream();
                        byte[] messageBytes = message.getBytes(Constants.CHARSET);
                        outputStream.write(messageBytes.length);
                        //将字符串编码之后写入客户端
                        outputStream.write(messageBytes);
                        //刷新缓冲区
                        outputStream.flush();
                    } catch (IOException e) {
                        logger.error("消息转发失败", e);
                    }
                }
            }
        }
    
    

    # 测试一下~

    • 服务端启动,客户端接入
    image.png
    • 客户端接入
    image.png
    • 客户端发送消息
    image.png
    • 服务端打印并转发消息
    image.png
    • 聊天室内的其他小伙伴收到服务器转发的消息
    image.png
    • 小马哥客户端下线
    image.png
    • 服务器收到小马哥的下线通知
    image.png

    # 总结

    • 非常优雅~😊

    # 注意

    • 本文约定的是第一个字节为消息大小的标记,一个字节可以表示的最大值为255,所以一次最多传输255个字节,如果超过这个值,会造成业务错误,需要注意。
    • 所以使用几个字节来作为标识需要从业务的角度来考虑
      • 一个字节8位,可表示的最大值为 255 = 255B
      • 二个字节16位,可表示的最大值为 65535 = 64KB
      • 三个字节24位,可表示的最大值为 16777215 = 16MB
      • 四个字节32位,可表示的最大值为 4294967295 = 4GB
      • 以此类推....

    # 系列文章

    欢迎在评论区留下你看文章时的思考,及时说出,有助于加深记忆和理解,还能和像你一样也喜欢这个话题的读者相遇~

    image.png

    相关文章

      网友评论

        本文标题:【BIO】通过指定消息大小实现的多人聊天室-终极版本

        本文链接:https://www.haomeiwen.com/subject/anpwqktx.html