在分布式环境下,由于一个服务通常会有多个实例,这就使得客户端在多次请求时,可能会作用到不同的实例,造成 Socket Session 找不到问题。那怎么解决呢?本文将通过使用 Redis 发布订阅模式解决这一问题。
逻辑图
整体逻辑管理员发送消息
@GetMapping("/message/send")
public Boolean send(@RequestParam(defaultValue = "1", required = false) Long id,
@RequestParam(defaultValue = "测试消息", required = false) String content) {
WebsocketUtil.sendDistributed(WebsocketMessage.builder().id(id).content(content).build());
return true;
}
每个客户端连接的 session 都会存储到WebsocketUtils类Map<ID, WebSocketServer>变量中,这也是造成分布式下发送消息可能会找不到session的原因。
Redis 消息发布与订阅,实现每个服务实例的消息分发
public static void sendDistributed(WebsocketMessage websocketMessage) {
StringRedisTemplate template = SpringUtils.getBean(StringRedisTemplate.class);
template.convertAndSend(Topics.CHANNEL_MESSAGE, JSON.toJSONString(websocketMessage));
}
监听到 Redis 发送过来的消息
@Component
public class WebsocketMessageListener extends AbstractMessageListener<WebsocketMessage> {
@Override
public void onMessage(WebsocketMessage websocketMessage) {
WebsocketUtil.sendStandalone(websocketMessage);
}
}
WebsocketMessage
为整个过程传递的消息体,包含 客户端编号(对应上述 Map 的 key 值)和消息内容,就是要向什么人发送什么内容的消息。
Websocket 进行单机消息发送
public static void sendStandalone(WebsocketMessage websocketMessage) {
if (websocketMap.containsKey(websocketMessage.getId())) {
try {
websocketMap.get(websocketMessage.getId()).getSession().getBasicRemote().sendText(websocketMessage.getContent());
} catch (IOException e) {
log.error("服务端发送消息【{}】失败,原因:【{}】", websocketMessage.getId(), e.getMessage());
}
log.info("服务端发送消息给客户端【{}】成功,内容为【{}】", websocketMessage.getId(), websocketMessage.getContent());
} else {
log.error("服务端发送消息【{}】失败,原因:客户端【{}】未连接", websocketMessage.getContent(), websocketMessage.getId());
}
}
找到存储在本地缓存中的 WebSocketServer(对应上述 Map 的 value 值) 进行单机消息发送。
测试结果
客户端服务端
思考与拓展?
1、消息的可靠性,redis 消息发布,客户端没有收到问题?持久化,未发送的消息继续发送或使用更加可靠的消息中间件
2、消息重复消费问题?重复处理
3、websocket掉线重连? 客户端处理,在每次掉线的时候延迟重连
由于作者学识有限,文中如有不足之处或有需要改进和优化的地方,不吝赐教。
网友评论