美文网首页
Redis 发布订阅实现分布式 WebSocket 消息通讯

Redis 发布订阅实现分布式 WebSocket 消息通讯

作者: 极简博客 | 来源:发表于2023-04-09 15:37 被阅读0次

    在分布式环境下,由于一个服务通常会有多个实例,这就使得客户端在多次请求时,可能会作用到不同的实例,造成 Socket Session 找不到问题。那怎么解决呢?本文将通过使用 Redis 发布订阅模式解决这一问题。

    逻辑图

    整体逻辑

    管理员发送消息

    @GetMapping("/message/send")
    public Boolean send(@RequestParam(defaultValue = "1", required = false) Long id,
    @RequestParam(defaultValue = "测试消息", required = false) String content) {
        WebsocketUtil.sendDistributed(WebsocketMessage.builder().id(id).content(content).build());
        return true;
    }
    

    每个客户端连接的 session 都会存储到WebsocketUtils类Map<ID, WebSocketServer>变量中,这也是造成分布式下发送消息可能会找不到session的原因。

    Redis 消息发布与订阅,实现每个服务实例的消息分发

    public static void sendDistributed(WebsocketMessage websocketMessage) {
        StringRedisTemplate template = SpringUtils.getBean(StringRedisTemplate.class);
        template.convertAndSend(Topics.CHANNEL_MESSAGE, JSON.toJSONString(websocketMessage));
    }
    

    监听到 Redis 发送过来的消息

    @Component
    public class WebsocketMessageListener extends AbstractMessageListener<WebsocketMessage> {
      @Override
      public void onMessage(WebsocketMessage websocketMessage) {
        WebsocketUtil.sendStandalone(websocketMessage);
      }
    }
    

    WebsocketMessage 为整个过程传递的消息体,包含 客户端编号(对应上述 Map 的 key 值)和消息内容,就是要向什么人发送什么内容的消息。

    Websocket 进行单机消息发送

    public static void sendStandalone(WebsocketMessage websocketMessage) {
        if (websocketMap.containsKey(websocketMessage.getId())) {
            try {
                websocketMap.get(websocketMessage.getId()).getSession().getBasicRemote().sendText(websocketMessage.getContent());
            } catch (IOException e) {
                log.error("服务端发送消息【{}】失败,原因:【{}】", websocketMessage.getId(), e.getMessage());
            }
            log.info("服务端发送消息给客户端【{}】成功,内容为【{}】", websocketMessage.getId(), websocketMessage.getContent());
        } else {
            log.error("服务端发送消息【{}】失败,原因:客户端【{}】未连接", websocketMessage.getContent(), websocketMessage.getId());
        }
    }
    

    找到存储在本地缓存中的 WebSocketServer(对应上述 Map 的 value 值) 进行单机消息发送。

    测试结果

    客户端
    服务端

    思考与拓展?

    1、消息的可靠性,redis 消息发布,客户端没有收到问题?持久化,未发送的消息继续发送或使用更加可靠的消息中间件
    2、消息重复消费问题?重复处理
    3、websocket掉线重连? 客户端处理,在每次掉线的时候延迟重连

    由于作者学识有限,文中如有不足之处或有需要改进和优化的地方,不吝赐教。

    参考文章

    如何Redis解决WebSocket分布式场景下的Session共享问题
    WEBSOCKET 在线测试工具

    相关文章

      网友评论

          本文标题:Redis 发布订阅实现分布式 WebSocket 消息通讯

          本文链接:https://www.haomeiwen.com/subject/uymbddtx.html