一、背景
在线课堂业务中,参与方的教师、学生和助教等人员,在实时上课的过程中,需要做到消息的及时传达。
一次上课,可以认为是创建了一个房间,教师把学生都加入到房间,然后就可以在房间里互发消息。
消息的发送又分为单播和广播两种模式。
本文主要是概述如何使用通道技术实现一个聊天系统,介绍一些我们公司的设计与实现思路,希望能够给到大家一些帮助。
二、目标
- 实现群聊与私聊的聊天系统
- 支持多种业务形态下的消息通道,做到数据隔离
- 消息的持久化,支持离线消息
- 统计和分析房间及用户的活跃、在线情况
- 支持多种自定义的命令消息
三、ChannelGroup与Channel
3.1、channel在分布式环境下的保存问题
由于它不能存储到redis,因为netty的channel是一个连接,是和机器的硬件绑定的,无法序列化。所以一般有两种做法,前者是在连接层上面再设计一个负载均衡层,由它来寻址转发到哪台后端机器;后者是让命令消息广播,让每个节点都尝试处理,如果当前用户的通道不在当前机器的内存中,则忽略。总之,保证消息有且只有一个后端服务的节点会处理消息。
3.1.1、使用rabbitmq的广播模式fanout
所有bind到此exchange的queue都可以接收消息。
代码示例:
/**
* 通道队列名
*/
private String channelQueue = "ykt.channel.queue." + HostUtils.getMac();
/**
* 死信队列通道名
*/
private String deadChannelQueue = "ykt.channel.dead.queue";
/**
* 通道交换机名
*/
private String channelExchange = "ykt.channel.exchange";
/**
* 死信交换机名
*/
private String deadExchange = "ykt.channel.dead.letter.exchange";
/**
* 死信路由名
*/
private String deadRoutingKey = "ykt.channel.dead.letter.routeKey";
/**
* 通道队列路有名
*/
private String channelRoutingKey = "ykt.channel.routeKey";
/**
* 通道队列
*
* @param: []
* @return: org.springframework.amqp.core.Queue
**/
@Bean
public Queue channelQueue() {
Map<String, Object> params = new HashMap<>();
params.put("x-message-ttl", 5 * 1000);
params.put("x-dead-letter-exchange", deadExchange);
params.put("x-dead-letter-routing-key", deadRoutingKey);
return new Queue(channelQueue, false, false, false, params);
}
/**
* 通道交换机 fanout广播模式
*
* @param: []
* @return: org.springframework.amqp.core.Exchange
**/
@Bean
public Exchange channelExchange() {
return ExchangeBuilder.fanoutExchange(channelExchange).durable(true).build();
}
/**
* 通道队列绑定交换机
*
* @param: []
* @return: org.springframework.amqp.core.Binding
**/
@Bean
public Binding channelBinding() {
return BindingBuilder.bind(channelQueue()).to(channelExchange()).with(channelQueue).noargs();
}
3.1.2、redis的订阅与广播
redis支持消息订阅与发布机制,可以使用该机制实现不同服务间的消息转发。
在广播消息时,需要携带能唯一标识接收者身份的字段(例如clientId)。消息广播结束后,所有服务端会收到该消息,服务端仅仅需要判断该消息接收者的是否是连接的自己作为服务端。若发现该接收者正是连接的自己,则直接将消息转发到该客户端即可。
3.2、channel和user的映射需要程序自己实现
image.png建立通道与通道组,和用户与房间的映射关系。通过下面一图可见:
具体实现,我们是使用Map集合,将之存储在Jvm内存中。也就是说,在应用重启的时候,会丢失用户和通道,房间和通道组的关联。随之,也会断开和端建立的通道,用户会需要重新进入房间。
换句话说,应用在重启的时候,会给用户带来极大的不便,体验方面也影响挺大。这也是为什么IM系统都会在最外面有连接层,极少重启连接层,重启的都是后面的消息处理模块。
-
用户加入房间的流程
image.png
场景补充:userId要加入到roomId,通道建立的时候,我们需要将用户与channel,房间与channelGroup建立映射关系。
-
用户查找房间的通道
image.png
场景补充:用户的通道,是指用户在某个房间下的通道,是脱离不开房间来说通道的。
3.3、数据结构设计
保存所有的房间(通道组)及其里面的成员用户(通道)。
/**
* key是groupId
* value是ChannelGroup
* 存放ChannelGroup
* groupId 也就是roomId
*/
private static Map<String, ChannelGroup> groupIdChannelGroupMap = new ConcurrentHashMap<>();
/**
* key是groupId
* value又是一个Key-Value格式,是每个用户对应的Channel
* [groupId -> [clientId1->channelId1,clientId2->channelId2,clientId3->channelId3,****],]
* 存放Channel
* groupId 也就是roomId
* clientId 也就是userId
* channelId 也就是Channel的id
*/
private static Map<String, Map<String, ChannelId>> groupIdClientIdChannelIdMap = new ConcurrentHashMap<>();
四、Netty的启动流程
先贴一张图:
netty架构概念图.png image.png
配置项
- Netty启动端口
- IdleStateHandler 的心跳时间
五、数模设计
image.png六、多种业务通道的实现
image.png- 自定义注解
- 设计模式之工厂方法
- 反射技术
初始化
应用重启的时候,扫描所有的注解及方法,读取上面的属性,然后缓存到集合。为后面的工厂方法做准备。
- 类注册
-
方法注册
image.png
工厂方法
读取请求报文中的appKey和command参数,根据appKey查找到对应的EndpointService实现类,根据command查到对应的Method方法。
使用Method.invoke()方法执行具体的动作。
// 类Object的集合instanceMapper
Object instance = instanceMapper.getEndpointInstance(appKey);
// 方法Method的结合pointMapper
Method onCommand = pointMapper.getOnCommand(appKey, command);
// instance是指向类,第二个参数是Session,第三个参数text是消息内容
onCommand.invoke(instance, session, text);
七、通道的安全
每次请求都校验token的有效期,如果已过期,则提示重新登录或自动延长有效期。
- 让客户端输入用户名密码,然后由服务端颁发token,后续的每次请求都由客户端带上该token
- 如果是没有交互界面,一般由客户端把登录系统的token替代,由通道服务去调用认证服务验证
- 增加签名字段作为额外的安全校验
八、参考链接
九、存在的问题
- 没有引入负载均衡层,用户的通道缺少路由,我们采用的mq广播方式,让每个节点都尝试去处理消息,性能消耗不少。
- 假集群,某个节点挂了,会强制断开和客户端的连接。在不怎么发版的情况下,勉强能接受。
- 但是集群无法做到消息转发,会有单点故障,节点A的消息不能交由节点B去发送给客户端。
网友评论