美文网首页
Redis 发布订阅实现分布式 WebSocket 消息通讯

Redis 发布订阅实现分布式 WebSocket 消息通讯

作者: 极简博客 | 来源:发表于2023-04-09 15:37 被阅读0次

在分布式环境下,由于一个服务通常会有多个实例,这就使得客户端在多次请求时,可能会作用到不同的实例,造成 Socket Session 找不到问题。那怎么解决呢?本文将通过使用 Redis 发布订阅模式解决这一问题。

逻辑图

整体逻辑

管理员发送消息

@GetMapping("/message/send")
public Boolean send(@RequestParam(defaultValue = "1", required = false) Long id,
@RequestParam(defaultValue = "测试消息", required = false) String content) {
    WebsocketUtil.sendDistributed(WebsocketMessage.builder().id(id).content(content).build());
    return true;
}

每个客户端连接的 session 都会存储到WebsocketUtils类Map<ID, WebSocketServer>变量中,这也是造成分布式下发送消息可能会找不到session的原因。

Redis 消息发布与订阅,实现每个服务实例的消息分发

public static void sendDistributed(WebsocketMessage websocketMessage) {
    StringRedisTemplate template = SpringUtils.getBean(StringRedisTemplate.class);
    template.convertAndSend(Topics.CHANNEL_MESSAGE, JSON.toJSONString(websocketMessage));
}

监听到 Redis 发送过来的消息

@Component
public class WebsocketMessageListener extends AbstractMessageListener<WebsocketMessage> {
  @Override
  public void onMessage(WebsocketMessage websocketMessage) {
    WebsocketUtil.sendStandalone(websocketMessage);
  }
}

WebsocketMessage 为整个过程传递的消息体,包含 客户端编号(对应上述 Map 的 key 值)和消息内容,就是要向什么人发送什么内容的消息。

Websocket 进行单机消息发送

public static void sendStandalone(WebsocketMessage websocketMessage) {
    if (websocketMap.containsKey(websocketMessage.getId())) {
        try {
            websocketMap.get(websocketMessage.getId()).getSession().getBasicRemote().sendText(websocketMessage.getContent());
        } catch (IOException e) {
            log.error("服务端发送消息【{}】失败,原因:【{}】", websocketMessage.getId(), e.getMessage());
        }
        log.info("服务端发送消息给客户端【{}】成功,内容为【{}】", websocketMessage.getId(), websocketMessage.getContent());
    } else {
        log.error("服务端发送消息【{}】失败,原因:客户端【{}】未连接", websocketMessage.getContent(), websocketMessage.getId());
    }
}

找到存储在本地缓存中的 WebSocketServer(对应上述 Map 的 value 值) 进行单机消息发送。

测试结果

客户端
服务端

思考与拓展?

1、消息的可靠性,redis 消息发布,客户端没有收到问题?持久化,未发送的消息继续发送或使用更加可靠的消息中间件
2、消息重复消费问题?重复处理
3、websocket掉线重连? 客户端处理,在每次掉线的时候延迟重连

由于作者学识有限,文中如有不足之处或有需要改进和优化的地方,不吝赐教。

参考文章

如何Redis解决WebSocket分布式场景下的Session共享问题
WEBSOCKET 在线测试工具

相关文章

网友评论

      本文标题:Redis 发布订阅实现分布式 WebSocket 消息通讯

      本文链接:https://www.haomeiwen.com/subject/uymbddtx.html