美文网首页
通道服务的开发设计

通道服务的开发设计

作者: 天草二十六_简村人 | 来源:发表于2022-10-12 15:47 被阅读0次

一、背景

在线课堂业务中,参与方的教师、学生和助教等人员,在实时上课的过程中,需要做到消息的及时传达。
一次上课,可以认为是创建了一个房间,教师把学生都加入到房间,然后就可以在房间里互发消息。
消息的发送又分为单播和广播两种模式。

本文主要是概述如何使用通道技术实现一个聊天系统,介绍一些我们公司的设计与实现思路,希望能够给到大家一些帮助。

二、目标

  • 实现群聊与私聊的聊天系统
  • 支持多种业务形态下的消息通道,做到数据隔离
  • 消息的持久化,支持离线消息
  • 统计和分析房间及用户的活跃、在线情况
  • 支持多种自定义的命令消息

三、ChannelGroup与Channel

3.1、channel在分布式环境下的保存问题

由于它不能存储到redis,因为netty的channel是一个连接,是和机器的硬件绑定的,无法序列化。所以一般有两种做法,前者是在连接层上面再设计一个负载均衡层,由它来寻址转发到哪台后端机器;后者是让命令消息广播,让每个节点都尝试处理,如果当前用户的通道不在当前机器的内存中,则忽略。总之,保证消息有且只有一个后端服务的节点会处理消息。

3.1.1、使用rabbitmq的广播模式fanout

所有bind到此exchange的queue都可以接收消息。

代码示例:


    /**
     * 通道队列名
     */
    private String channelQueue = "ykt.channel.queue." + HostUtils.getMac();
    /**
     * 死信队列通道名
     */
    private String deadChannelQueue = "ykt.channel.dead.queue";
    /**
     * 通道交换机名
     */
    private String channelExchange = "ykt.channel.exchange";

    /**
     * 死信交换机名
     */
    private String deadExchange = "ykt.channel.dead.letter.exchange";
    /**
     * 死信路由名
     */
    private String deadRoutingKey = "ykt.channel.dead.letter.routeKey";
    /**
     * 通道队列路有名
     */
    private String channelRoutingKey = "ykt.channel.routeKey";

    /**
     * 通道队列
     *
     * @param: []
     * @return: org.springframework.amqp.core.Queue
     **/
    @Bean
    public Queue channelQueue() {
        Map<String, Object> params = new HashMap<>();
        params.put("x-message-ttl", 5 * 1000);
        params.put("x-dead-letter-exchange", deadExchange);
        params.put("x-dead-letter-routing-key", deadRoutingKey);
        return new Queue(channelQueue, false, false, false, params);
    }

    /**
     * 通道交换机 fanout广播模式
     *
     * @param: []
     * @return: org.springframework.amqp.core.Exchange
     **/
    @Bean
    public Exchange channelExchange() {
        return ExchangeBuilder.fanoutExchange(channelExchange).durable(true).build();
    }

    /**
     * 通道队列绑定交换机
     *
     * @param: []
     * @return: org.springframework.amqp.core.Binding
     **/
    @Bean
    public Binding channelBinding() {
        return BindingBuilder.bind(channelQueue()).to(channelExchange()).with(channelQueue).noargs();
    }

3.1.2、redis的订阅与广播

redis支持消息订阅与发布机制,可以使用该机制实现不同服务间的消息转发。
在广播消息时,需要携带能唯一标识接收者身份的字段(例如clientId)。消息广播结束后,所有服务端会收到该消息,服务端仅仅需要判断该消息接收者的是否是连接的自己作为服务端。若发现该接收者正是连接的自己,则直接将消息转发到该客户端即可。

3.2、channel和user的映射需要程序自己实现

建立通道与通道组,和用户与房间的映射关系。通过下面一图可见:

image.png

具体实现,我们是使用Map集合,将之存储在Jvm内存中。也就是说,在应用重启的时候,会丢失用户和通道,房间和通道组的关联。随之,也会断开和端建立的通道,用户会需要重新进入房间。
换句话说,应用在重启的时候,会给用户带来极大的不便,体验方面也影响挺大。这也是为什么IM系统都会在最外面有连接层,极少重启连接层,重启的都是后面的消息处理模块。

  • 用户加入房间的流程


    image.png

    场景补充:userId要加入到roomId,通道建立的时候,我们需要将用户与channel,房间与channelGroup建立映射关系。

  • 用户查找房间的通道


    image.png

    场景补充:用户的通道,是指用户在某个房间下的通道,是脱离不开房间来说通道的。

3.3、数据结构设计

保存所有的房间(通道组)及其里面的成员用户(通道)。

    /**
     * key是groupId 
     * value是ChannelGroup
     * 存放ChannelGroup
     * groupId 也就是roomId
     */
    private static Map<String, ChannelGroup> groupIdChannelGroupMap = new ConcurrentHashMap<>();

    /**
     * key是groupId 
     * value又是一个Key-Value格式,是每个用户对应的Channel
     * [groupId -> [clientId1->channelId1,clientId2->channelId2,clientId3->channelId3,****],]
    * 存放Channel
     * groupId 也就是roomId
     * clientId 也就是userId
     * channelId 也就是Channel的id
     */
    private static Map<String, Map<String, ChannelId>> groupIdClientIdChannelIdMap = new ConcurrentHashMap<>();

四、Netty的启动流程

先贴一张图:


netty架构概念图.png image.png

配置项

  • Netty启动端口
  • IdleStateHandler 的心跳时间

五、数模设计

image.png

六、多种业务通道的实现

image.png
  • 自定义注解
  • 设计模式之工厂方法
  • 反射技术

初始化

应用重启的时候,扫描所有的注解及方法,读取上面的属性,然后缓存到集合。为后面的工厂方法做准备。

  • 类注册
  • 方法注册


    image.png

工厂方法

读取请求报文中的appKey和command参数,根据appKey查找到对应的EndpointService实现类,根据command查到对应的Method方法。
使用Method.invoke()方法执行具体的动作。

image.png
// 类Object的集合instanceMapper
Object instance = instanceMapper.getEndpointInstance(appKey);
// 方法Method的结合pointMapper
Method onCommand = pointMapper.getOnCommand(appKey, command);

// instance是指向类,第二个参数是Session,第三个参数text是消息内容
onCommand.invoke(instance, session, text);

七、通道的安全

每次请求都校验token的有效期,如果已过期,则提示重新登录或自动延长有效期。

  • 让客户端输入用户名密码,然后由服务端颁发token,后续的每次请求都由客户端带上该token
  • 如果是没有交互界面,一般由客户端把登录系统的token替代,由通道服务去调用认证服务验证
  • 增加签名字段作为额外的安全校验

八、参考链接

九、存在的问题

  • 没有引入负载均衡层,用户的通道缺少路由,我们采用的mq广播方式,让每个节点都尝试去处理消息,性能消耗不少。
  • 假集群,某个节点挂了,会强制断开和客户端的连接。在不怎么发版的情况下,勉强能接受。
  • 但是集群无法做到消息转发,会有单点故障,节点A的消息不能交由节点B去发送给客户端。

相关文章

网友评论

      本文标题:通道服务的开发设计

      本文链接:https://www.haomeiwen.com/subject/fxkzartx.html