美文网首页收藏技术Codeweb常用技能
欢迎来借鉴分布式WebSocket解决方案

欢迎来借鉴分布式WebSocket解决方案

作者: HeloWxl | 来源:发表于2021-12-05 13:47 被阅读0次

    单体Webscoket

    • springboot版本: 2.1.1.RELEASE
    • jdk: 1.8

    示例代码

    • WebsocketServer
    @ServerEndpoint("/client/{userName}")
    @Component
    @Slf4j
    public class WebSocketServer {
    
        /**
         * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
         */
        private static int onlineCount = 0;
        /**
         * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
         */
        private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
        /**
         * 与某个客户端的连接会话,需要通过它来给客户端发送数据
         */
        private Session session;
        /**
         * 接收userId
         */
        private String userName = "";
    
        /**
         * @Description: 连接建立成功调用的方法,成功建立之后,将用户的userName 存储到redis
         * @params: [session, userId]
         * @return: void
         * @Author: wangxianlin
         * @Date: 2020/5/9 9:13 PM
         */
        @OnOpen
        public void onOpen(Session session, @PathParam("userName") String userName) {
            this.session = session;
            this.userName = userName;
            webSocketMap.put(userName, this);
            addOnlineCount();
            log.info("用户连接:" + userName + ",当前在线人数为:" + getOnlineCount());
        }
    
        /**
         * @Description: 连接关闭调用的方法
         * @params: []
         * @return: void
         * @Author: wangxianlin
         * @Date: 2020/5/9 9:13 PM
         */
        @OnClose
        public void onClose() {
            if (webSocketMap.containsKey(userName)) {
                webSocketMap.remove(userName);
                //从set中删除
                subOnlineCount();
            }
            log.info("用户退出:" + userName + ",当前在线人数为:" + getOnlineCount());
        }
    
    
        /**
         * @Description: 收到客户端消息后调用的方法, 调用API接口 发送消息到
         * @params: [message, session]
         * @return: void
         * @Author: wangxianlin
         * @Date: 2020/5/9 9:13 PM
         */
        @OnMessage
        public void onMessage(String message, @PathParam("userName") String userName) {
            log.info("用户消息:" + userName + ",报文:" + message);
            if (StringUtils.isNotBlank(message)) {
                try {
                    //解析发送的报文
                    JSONObject jsonObject = JSON.parseObject(message);
                    //追加发送人(防止串改)
                    jsonObject.put("sender", this.userName);
                    String receiver = jsonObject.getString("receiver");
                    //传送给对应toUserId用户的websocket
                    if (StringUtils.isNotBlank(receiver) && webSocketMap.containsKey(receiver)) {
                        webSocketMap.get(receiver).session.getBasicRemote().sendText(jsonObject.toJSONString());
                    } else {
                        log.error("用户:" + receiver + "不在该服务器上");
                        //否则不在这个服务器上,发送到mysql或者redis
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        /**
         * 发布websocket消息
         * 消息格式: { "sender": "u2","receiver": "u1","msg": "hello world","createTime":"2021-10-12 11:12:11"}
         *
         * @param dto
         * @return
         */
        public static void sendWebsocketMessage(ChatMsg dto) {
            if (dto != null) {
                if (StringUtils.isNotBlank(dto.getReceiver()) && webSocketMap.containsKey(dto.getReceiver())) {
                    String json = JSON.toJSONString(dto);
                    try {
                        webSocketMap.get(dto.getReceiver()).session.getBasicRemote().sendText(json);
                    } catch (IOException e) {
                        log.error("消息发送异常:{}", e.toString());
                    }
                } else {
                    log.error("用户:" + dto.getReceiver() + ",不在线!");
                }
            }
        }
    
        /**
         * @param session
         * @param error
         */
        @OnError
        public void onError(Session session, Throwable error) {
            log.error("用户错误:" + this.userName + ",原因:" + error.getMessage());
            error.printStackTrace();
        }
    
        /**
         * @Description: 获取在线人数
         * @params: []
         * @return: int
         * @Author: wangxianlin
         * @Date: 2020/5/9 9:09 PM
         */
        public static synchronized int getOnlineCount() {
            return onlineCount;
        }
    
        /**
         * @Description: 在线人数+1
         * @params: []
         * @return: void
         * @Author: wangxianlin
         * @Date: 2020/5/9 9:09 PM
         */
        public static synchronized void addOnlineCount() {
            WebSocketServer.onlineCount++;
        }
    
        /**
         * @Description: 在线人数-1
         * @params: []
         * @return: void
         * @Author: wangxianlin
         * @Date: 2020/5/9 9:09 PM
         */
        public static synchronized void subOnlineCount() {
            WebSocketServer.onlineCount--;
        }
    }
    
    • WebSocketConfig
    @Configuration
    public class WebSocketConfig {
        @Bean
        public ServerEndpointExporter serverEndpointExporter(){
            return new ServerEndpointExporter();
        }
    }
    
    • 前端代码
    var socket;
        var userName;
        establishConnection()
        /***建立连接*/
        function establishConnection() {
            userName = $("#sender").val();
            if (userName == '' || userName == null) {
                alert("请输入发送者");
                return;
            }
            //实现化WebSocket对象,指定要连接的服务器地址与端口  建立连接
            var socketUrl = "" + window.location.protocol + "//" + window.location.host + "/client/" + userName;
            socketUrl = socketUrl.replace("https", "ws").replace("http", "ws");
            if (socket != null) {
                socket.close();
                socket = null;
            }
            socket = new WebSocket(socketUrl);
            //打开事件
            socket.onopen = function () {
                console.log("开始建立链接....")
            };
            //关闭事件
            socket.onclose = function () {
                console.log("websocket已关闭");
            };
            //发生了错误事件
            socket.onerror = function () {
                console.log("websocket发生了错误");
            };
            /**
             * 接收消息
             * @param msg
             */
            socket.onmessage = function (msg) {
                msg = JSON.parse(msg.data);
                console.log(msg);
                if (msg.msg != '连接成功') {
                    $("#msgDiv").append('<p class="other">用户名:' + msg.sender + '</p><p class="chat">' + msg.msg + '</p>');
                }
            };
        }
        /**
         * 发送消息
         */
        function sendMessage() {
            var msg = $("#msg").val();
            if (msg == '' || msg == null) {
                alert("消息内容不能为空");
                return;
            }
            var receiver = $("#receiver").val();
            if (receiver == '' || receiver == null) {
                alert("接收人不能为空");
                return;
            }
            var msgObj = {
                "receiver": receiver,
                "msg": msg
            };
            $("#msgDiv").append('<p class="user">用户名:' + userName + '</p><p class="chat">' + msg + '</p>');
            try{
                socket.send(JSON.stringify(msgObj));
                $("#msg").val('');
            }catch (e) {
                alert("服务器内部错误");
            }
        }
    
    • 测试效果


      image.png
    • 问题
      如果两个客户端连接不在同一个服务器上,会出现什么问题?
      结果就是如下所示:
      image.png

    如何解决多台客户端连接在不同服务器,互相发送消息问题!

    分布式WebSocket 解决

    方案一 Redis消息订阅与发布

    image.png

    描述:
    客户端A 和客户端B 都订阅同一个Topic ,后台Websocket收到消息后,将消息发送至Redis中,同时服务端会监听该渠道内的消息,监听到消息后,会将消息推送至对应的客户端。

    示例代码

    • application.yml
      主要是Redis配置
    server:
      port: 8082
    
    spring:
      thymeleaf:
        #模板的模式,支持 HTML, XML TEXT JAVASCRIPT
        mode: HTML5
        #编码 可不用配置
        encoding: UTF-8
        #内容类别,可不用配置
        content-type: text/html
        #开发配置为false,避免修改模板还要重启服务器
        cache: false
    #    #配置模板路径,默认是templates,可以不用配置
        prefix: classpath:/templates
        suffix: .html
    
      #Redis配置
      redis:
        host: localhost
        port: 6379
        password: 123456
        timeout: 5000
    
    • RedisSubscriberConfig.java
    /**
     * @Description 消息订阅配置类
     * @Author wxl
     * @Date 2020/3/31 13:54
     */
    @Configuration
    public class RedisSubscriberConfig {
        /**
         * 消息监听适配器,注入接受消息方法
         *
         * @param receiver
         * @return
         */
        @Bean
        public MessageListenerAdapter messageListenerAdapter(ChatMessageListener receiver) {
            return new MessageListenerAdapter(receiver);
        }
        /**
         * 创建消息监听容器
         *
         * @param redisConnectionFactory
         * @param messageListenerAdapter
         * @return
         */
        @Bean
        public RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter messageListenerAdapter) {
            RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
            redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
            redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic(TOPIC_CUSTOMER));
            return redisMessageListenerContainer;
        }
    }
    
    • RedisUtil.java
    @Component
    public class RedisUtil {
    
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
        /**
         * 发布
         *
         * @param key
         */
        public void publish(String key, String value) {
            stringRedisTemplate.convertAndSend(key, value);
        }
    }
    
    • ChatMessageListener.java
    /**
     * @Description 集群聊天消息监听器
     * @Author wxl
     * @Date 2020/3/29 15:07
     */
    @Slf4j
    @Component
    public class ChatMessageListener implements MessageListener {
    
        @Autowired
        private StringRedisTemplate redisTemplate;
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            RedisSerializer<String> valueSerializer = redisTemplate.getStringSerializer();
            String value = valueSerializer.deserialize(message.getBody());
            ChatMsg dto = null;
            if (StringUtils.isNotBlank(value)) {
                try {
                    dto = JacksonUtil.json2pojo(value, ChatMsg.class);
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error("消息格式转换异常:{}", e.toString());
                }
                log.info("监听集群websocket消息--- {}", value);
                WebSocketServer.sendWebsocketMessage(dto);
            }
        }
    }
    
    • WebSocketServer
    @ServerEndpoint("/client/{userName}")
    @Component
    @Slf4j
    public class WebSocketServer {
    
        /**
         * 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
         */
        private static int onlineCount = 0;
        /**
         * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
         */
        private static ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
        /**
         * 与某个客户端的连接会话,需要通过它来给客户端发送数据
         */
        private Session session;
    
        /**
         * 不能使用@AutoWire原因:发现注入不了redis,redis注入失败 可能是因为实例化的先后顺序吧,WebSocket先实例化了,  但是@Autowire是会触发getBean操作
         * 因为@ServerEndpoint不支持注入,所以使用SpringUtils获取IOC实例
         */
        private RedisUtil redisUtil = SpringUtils.getBean(RedisUtil.class);
    
        /**
         * 接收userId
         */
        private String userName = "";
    
     
    
        /**
         * @Description: 连接建立成功调用的方法,成功建立之后,将用户的userName 存储到redis
         * @params: [session, userId]
         * @return: void
         * @Author: wangxianlin
         * @Date: 2020/5/9 9:13 PM
         */
        @OnOpen
        public void onOpen(Session session, @PathParam("userName") String userName) {
            this.session = session;
            this.userName = userName;
            webSocketMap.put(userName, this);
            addOnlineCount();
            log.info("用户连接:" + userName + ",当前在线人数为:" + getOnlineCount());
        }
    
        /**
         * @Description: 连接关闭调用的方法
         * @params: []
         * @return: void
         * @Author: wangxianlin
         * @Date: 2020/5/9 9:13 PM
         */
        @OnClose
        public void onClose() {
            if (webSocketMap.containsKey(userName)) {
                webSocketMap.remove(userName);
                //从set中删除
                subOnlineCount();
            }
            log.info("用户退出:" + userName + ",当前在线人数为:" + getOnlineCount());
        }
    
    
        /**
         * @Description: 收到客户端消息后调用的方法, 调用API接口 发送消息到
         * @params: [message, session]
         * @return: void
         * @Author: wangxianlin
         * @Date: 2020/5/9 9:13 PM
         */
        @OnMessage
        public void onMessage(String message, @PathParam("userName") String userName) {
            log.info("用户消息:" + userName + ",报文:" + message);
            if (StringUtils.isNotBlank(message)) {
                try {
                    //解析发送的报文
                    JSONObject jsonObject = JSON.parseObject(message);
                    //追加发送人(防止串改)
                    jsonObject.put("sender", this.userName);
                    //传送给对应toUserId用户的websocket
                    redisUtil.publish(TOPIC_CUSTOMER,jsonObject.toString());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        /**
         * 发布websocket消息
         * 消息格式: { "sender": "u2","receiver": "u1","msg": "hello world","createTime":"2021-10-12 11:12:11"}
         *
         * @param dto
         * @return
         */
        public static void sendWebsocketMessage(ChatMsg dto) {
            if (dto != null) {
                if (StringUtils.isNotBlank(dto.getReceiver()) && webSocketMap.containsKey(dto.getReceiver())) {
                    String json = JSON.toJSONString(dto);
                    try {
                        webSocketMap.get(dto.getReceiver()).session.getBasicRemote().sendText(json);
                    } catch (IOException e) {
                        log.error("消息发送异常:{}", e.toString());
                    }
                } else {
                    log.error("用户:" + dto.getReceiver() + ",不在次服务器上!");
                }
            }
        }
    
        /**
         * @param session
         * @param error
         */
        @OnError
        public void onError(Session session, Throwable error) {
            log.error("用户错误:" + this.userName + ",原因:" + error.getMessage());
            error.printStackTrace();
        }
    
        /**
         * @Description: 获取在线人数
         * @params: []
         * @return: int
         * @Author: wangxianlin
         * @Date: 2020/5/9 9:09 PM
         */
        public static synchronized int getOnlineCount() {
            return onlineCount;
        }
    
        /**
         * @Description: 在线人数+1
         * @params: []
         * @return: void
         * @Author: wangxianlin
         * @Date: 2020/5/9 9:09 PM
         */
        public static synchronized void addOnlineCount() {
            WebSocketServer.onlineCount++;
        }
    
        /**
         * @Description: 在线人数-1
         * @params: []
         * @return: void
         * @Author: wangxianlin
         * @Date: 2020/5/9 9:09 PM
         */
        public static synchronized void subOnlineCount() {
            WebSocketServer.onlineCount--;
        }
    }
    
    • 测试效果


      image.png

    方案二 RabbitMq

    暂未完成,敬请期待!

    相关文章

      网友评论

        本文标题:欢迎来借鉴分布式WebSocket解决方案

        本文链接:https://www.haomeiwen.com/subject/goqixrtx.html