美文网首页
通道服务的开发设计

通道服务的开发设计

作者: 天草二十六_简村人 | 来源:发表于2022-10-12 15:47 被阅读0次

    一、背景

    在线课堂业务中,参与方的教师、学生和助教等人员,在实时上课的过程中,需要做到消息的及时传达。
    一次上课,可以认为是创建了一个房间,教师把学生都加入到房间,然后就可以在房间里互发消息。
    消息的发送又分为单播和广播两种模式。

    本文主要是概述如何使用通道技术实现一个聊天系统,介绍一些我们公司的设计与实现思路,希望能够给到大家一些帮助。

    二、目标

    • 实现群聊与私聊的聊天系统
    • 支持多种业务形态下的消息通道,做到数据隔离
    • 消息的持久化,支持离线消息
    • 统计和分析房间及用户的活跃、在线情况
    • 支持多种自定义的命令消息

    三、ChannelGroup与Channel

    3.1、channel在分布式环境下的保存问题

    由于它不能存储到redis,因为netty的channel是一个连接,是和机器的硬件绑定的,无法序列化。所以一般有两种做法,前者是在连接层上面再设计一个负载均衡层,由它来寻址转发到哪台后端机器;后者是让命令消息广播,让每个节点都尝试处理,如果当前用户的通道不在当前机器的内存中,则忽略。总之,保证消息有且只有一个后端服务的节点会处理消息。

    3.1.1、使用rabbitmq的广播模式fanout

    所有bind到此exchange的queue都可以接收消息。

    代码示例:

    
        /**
         * 通道队列名
         */
        private String channelQueue = "ykt.channel.queue." + HostUtils.getMac();
        /**
         * 死信队列通道名
         */
        private String deadChannelQueue = "ykt.channel.dead.queue";
        /**
         * 通道交换机名
         */
        private String channelExchange = "ykt.channel.exchange";
    
        /**
         * 死信交换机名
         */
        private String deadExchange = "ykt.channel.dead.letter.exchange";
        /**
         * 死信路由名
         */
        private String deadRoutingKey = "ykt.channel.dead.letter.routeKey";
        /**
         * 通道队列路有名
         */
        private String channelRoutingKey = "ykt.channel.routeKey";
    
        /**
         * 通道队列
         *
         * @param: []
         * @return: org.springframework.amqp.core.Queue
         **/
        @Bean
        public Queue channelQueue() {
            Map<String, Object> params = new HashMap<>();
            params.put("x-message-ttl", 5 * 1000);
            params.put("x-dead-letter-exchange", deadExchange);
            params.put("x-dead-letter-routing-key", deadRoutingKey);
            return new Queue(channelQueue, false, false, false, params);
        }
    
        /**
         * 通道交换机 fanout广播模式
         *
         * @param: []
         * @return: org.springframework.amqp.core.Exchange
         **/
        @Bean
        public Exchange channelExchange() {
            return ExchangeBuilder.fanoutExchange(channelExchange).durable(true).build();
        }
    
        /**
         * 通道队列绑定交换机
         *
         * @param: []
         * @return: org.springframework.amqp.core.Binding
         **/
        @Bean
        public Binding channelBinding() {
            return BindingBuilder.bind(channelQueue()).to(channelExchange()).with(channelQueue).noargs();
        }
    
    

    3.1.2、redis的订阅与广播

    redis支持消息订阅与发布机制,可以使用该机制实现不同服务间的消息转发。
    在广播消息时,需要携带能唯一标识接收者身份的字段(例如clientId)。消息广播结束后,所有服务端会收到该消息,服务端仅仅需要判断该消息接收者的是否是连接的自己作为服务端。若发现该接收者正是连接的自己,则直接将消息转发到该客户端即可。

    3.2、channel和user的映射需要程序自己实现

    建立通道与通道组,和用户与房间的映射关系。通过下面一图可见:

    image.png

    具体实现,我们是使用Map集合,将之存储在Jvm内存中。也就是说,在应用重启的时候,会丢失用户和通道,房间和通道组的关联。随之,也会断开和端建立的通道,用户会需要重新进入房间。
    换句话说,应用在重启的时候,会给用户带来极大的不便,体验方面也影响挺大。这也是为什么IM系统都会在最外面有连接层,极少重启连接层,重启的都是后面的消息处理模块。

    • 用户加入房间的流程


      image.png

      场景补充:userId要加入到roomId,通道建立的时候,我们需要将用户与channel,房间与channelGroup建立映射关系。

    • 用户查找房间的通道


      image.png

      场景补充:用户的通道,是指用户在某个房间下的通道,是脱离不开房间来说通道的。

    3.3、数据结构设计

    保存所有的房间(通道组)及其里面的成员用户(通道)。

        /**
         * key是groupId 
         * value是ChannelGroup
         * 存放ChannelGroup
         * groupId 也就是roomId
         */
        private static Map<String, ChannelGroup> groupIdChannelGroupMap = new ConcurrentHashMap<>();
    
        /**
         * key是groupId 
         * value又是一个Key-Value格式,是每个用户对应的Channel
         * [groupId -> [clientId1->channelId1,clientId2->channelId2,clientId3->channelId3,****],]
        * 存放Channel
         * groupId 也就是roomId
         * clientId 也就是userId
         * channelId 也就是Channel的id
         */
        private static Map<String, Map<String, ChannelId>> groupIdClientIdChannelIdMap = new ConcurrentHashMap<>();
    

    四、Netty的启动流程

    先贴一张图:


    netty架构概念图.png image.png

    配置项

    • Netty启动端口
    • IdleStateHandler 的心跳时间

    五、数模设计

    image.png

    六、多种业务通道的实现

    image.png
    • 自定义注解
    • 设计模式之工厂方法
    • 反射技术

    初始化

    应用重启的时候,扫描所有的注解及方法,读取上面的属性,然后缓存到集合。为后面的工厂方法做准备。

    • 类注册
    • 方法注册


      image.png

    工厂方法

    读取请求报文中的appKey和command参数,根据appKey查找到对应的EndpointService实现类,根据command查到对应的Method方法。
    使用Method.invoke()方法执行具体的动作。

    image.png
    // 类Object的集合instanceMapper
    Object instance = instanceMapper.getEndpointInstance(appKey);
    // 方法Method的结合pointMapper
    Method onCommand = pointMapper.getOnCommand(appKey, command);
    
    // instance是指向类,第二个参数是Session,第三个参数text是消息内容
    onCommand.invoke(instance, session, text);
    

    七、通道的安全

    每次请求都校验token的有效期,如果已过期,则提示重新登录或自动延长有效期。

    • 让客户端输入用户名密码,然后由服务端颁发token,后续的每次请求都由客户端带上该token
    • 如果是没有交互界面,一般由客户端把登录系统的token替代,由通道服务去调用认证服务验证
    • 增加签名字段作为额外的安全校验

    八、参考链接

    九、存在的问题

    • 没有引入负载均衡层,用户的通道缺少路由,我们采用的mq广播方式,让每个节点都尝试去处理消息,性能消耗不少。
    • 假集群,某个节点挂了,会强制断开和客户端的连接。在不怎么发版的情况下,勉强能接受。
    • 但是集群无法做到消息转发,会有单点故障,节点A的消息不能交由节点B去发送给客户端。

    相关文章

      网友评论

          本文标题:通道服务的开发设计

          本文链接:https://www.haomeiwen.com/subject/fxkzartx.html