美文网首页
基于RabbitMq的实现消息延时发送的优点以及其局限性;

基于RabbitMq的实现消息延时发送的优点以及其局限性;

作者: 名字是乱打的 | 来源:发表于2021-08-21 16:39 被阅读0次

    我们消息中心是负责承载各个业务(比如电商,物流,营销中心,券中心,会员中心,积分中心,停车场等等)的消息发送需求,那么消息呢就可能有延迟需求,比如物流到货后十五分钟进行一次邀请评价的需求。而且做个消息延迟发送,我们不可能让业务自己去写消息啥时候发送,到发送时间了再调我们接口这种逻辑,这样不合理,我们需要做比较强大的消息中心功能。

    那么延迟消息的实现有多种多样的实现,我们前阵子想实现延迟消息,对此做了一定的讨论,最后发现许多方案要么不支持分布式项目,要么平白的对机器性能损耗比较大,要么可能存在系统崩溃数据丢失的风险,最后我们采用了RabbitMQ的方式实现延迟队列。

    一. rabbitmq的延迟消息实现方式

    1.死信队列方式

    1.1我先大白话解释一下啥叫死信队列:
    • 首先死信队列是普通队列
    • 死信队列是在其他队列里的消息死亡后进入的队列
    • 死信队列本身不具有死信功能,需要绑定
    • 比如A绑定了死信队列是B,那么A中死亡的消息就会进入B内,B就被称之为死信队列

    上面提到的消息死亡有几种类型
    消息被拒绝(basic.reject / basic.nack),并且requeue = false
    消息TTL过期
    队列达到最大长度

    1.2延迟队列+死信队列实现延迟消息发送

    RabbitMQ支持给队列内的消息设置过期时间和给消息单独过期时间,那么结合死信队列我们就可以做到消息的延迟发送了;
    大概是以下步骤
    1.创建延迟队列并设置消息的过期时间,绑定一个死信队列
    2.不创建该队列的消费者,让其内部消息根据过期时间自动过期
    3.创建死信队列的消费者,使其每次消费死亡的消息;

    死信队列结构图

    看到之前有的人写的博文写的比较复杂,还把交换机写进来了,其实完全没必要,死信队列根本上只是队列之间的绑定以及数据交换,具体代码就不说了,因为重点不在这里;

    死信这种方式有个致命的缺点,导致我们这边无法使用:
    • 1.时序问题:如果我们消息使用的是同一个队列,然后我们给消息本身设置过期时间,那么同一个队列中消息消费是按顺序来的,而不是过期时间,也就说说如果我们正常队列有两个数据A ttl15秒 B ttl 3秒,A在队列前面B在后面;那么我们消费的时候及时B过期时间更短,我们也不会先消费B而是会先消费A,因为同一队列有顺序问题。
    • 2.过多队列问题
      前面说了如果单个队列那么消费就有顺序问题,那我们可以按过期时间分别绑定多个队列啊。
      但是如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有6s和60s两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,我们业务方指定的时间是无法确定的,不可能去限制业务方让他们只能某个时间发,那么我们要创建无数个队列??显然这是不可能的;

    因此这种方式直接pass了;

    2.借助rabbitmq的延迟插件

    2.1 延迟队列插件rabbitmq_delayed_message_exchange的安装

    • 1.首先去RabbitMQ 插件下载网站下载自己版本对应的ez文件类型插件

    • 2.插件rabbitmq_delayed_message_exchange下载完放入rabbitmq 的plugins文件下

    • 3.进入到rabbit文件的sbin目录,执行

    #查看插件目录
    rabbitmq-plugins list
    #安装延迟队列插件 (rabbitmq-plugins enable 插件名)
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    

    2.2 RabbitMq插件实现消息延迟的优点;

    这个优点就是解决了上面方式的缺点。。。让消息延迟性绑定到消息本身上,使的每个消息有自己的过期时间;

    2.3 实现方式;

    • 1.创建一个delay类型的exchange,绑定对应的队列
    • 2.这里delay类型和direct topic fanout等消息路由方式相对独立,也就是说我们可以和之前一样,指定exchange是direct还是topic还是fanout都可以。根本上来说:我们指定其是delay类型不过是决定了其什么时候投递到指定路由队列
    • 3.绑定路由队列进行消费
    我们目前有两种不同类型的延迟,因此我这里用的topic模式结合延迟插件实现延迟队列; topic结合delay插件实现延迟消息架构

    如上所示,我们延迟的消息首先都放置到Mq里,然后延迟时间到了之后呢就会被路由到指定队列上去;

    这么做有个小问题,如果我们延迟消息过多的话,那么必然存在着rabbitmq挤压消息,占用空间的问题,当然解决方案也比较简单

    • 延迟不超过一天的我们直接进入rabbitmq
    • 把延迟超过第一天的消息先进入mysql,每天定时扫第二天要发的数据,扫进mq里
    • 这样的话RabbitMQ就做到了只存储当天消息的能力;

    如果我们消息非常非常多,可以把消息分发区间划的更细点,比如只存储每12小时的消息,甚至只存储每个小时要发送的消息,这都是完全OK的;

    代码也很简单,这里提供一个绑定了两种业务的延迟队列的小demo:
    延迟队列配置:

    @Configuration
    public class DelayedRabbitMQConfig {
      /**
         * 声明延时队列hangfire
         * 不设置TTL
         */
        @Bean("hDelayQueue")
        public Queue hangfireDelayQueue() {
            return new Queue(BaseDict.DELAY_QUEUE_NAME_H);
        }
    
    
        /**
         * 业务调用延迟队列
         *
         */
        @Bean("mDelayQueue")
        public Queue moduleDelayQueue() {
            return new Queue(BaseDict.DELAY_QUEUE_NAME_M);
        }
    
    
        /**
         * 延迟交换机
         *
         */
        @Bean(name = "delayExchange")
        public CustomExchange delayExchange() {
            Map<String, Object> args = new HashMap<>(2);
            args.put("x-delayed-type", "topic");
            return new CustomExchange(BaseDict.DELAY_EXCHANGE_DISPATCHER, "x-delayed-message", true, false, args);
        }
    
    
        @Bean
        public Binding hangfireBinding(@Qualifier("hDelayQueue") Queue queue,
                                       @Qualifier("delayExchange") CustomExchange customExchange) {
            return BindingBuilder.bind(queue).to(customExchange).with(BaseDict.DELAY_QUEUE_ROUTING_KEY_H).noargs();
        }
    
    
        @Bean
        public Binding moduleBinding(@Qualifier("mDelayQueue") Queue queue,
                                     @Qualifier("delayExchange") CustomExchange customExchange) {
            return BindingBuilder.bind(queue).to(customExchange).with(BaseDict.DELAY_QUEUE_ROUTING_KEY_M).noargs();
        }
    }
    

    消息生产

        @PostMapping("addMsgToMDelayQueue")
        public void addMsgToMQueue(@RequestBody XXX request) {
            sendDelayMsg(BaseDict.DELAY_EXCHANGE_DISPATCHER,BaseDict.DELAY_QUEUE_ROUTING_KEY_M, JSONObject.toJSONString(request), 10*24*60*60*1000);
        }
    
    
    
        @PostMapping("addMsgToHDelay")
        public void addMsgToHQueue(@RequestBody List<AAA> request) {
            sendDelayMsg(BaseDict.DELAY_EXCHANGE_DISPATCHER,BaseDict.DELAY_QUEUE_ROUTING_KEY_H, JSONObject.toJSONString(request), 10*24*60*60*1000);
        }
    
    
        /**
         * 消息发送到延迟交换机上
         */
        public void sendDelayMsg(String exchange,String routingKey, String msg, Integer delayTime) {
            rabbitTemplate.convertAndSend(exchange, routingKey, msg, a -> {
                a.getMessageProperties().setDelay(delayTime);
                return a;
            });
        }
    

    消息消费

    @RabbitListener(queues = BaseDict.DELAY_QUEUE_NAME_H, concurrency = "10-30")
        public void hangfireReceive(Message message, Channel channel) throws IOException {
            final List<AAA> sendDetails = JSONObject.parseArray(JSONObject.toJSONString(message.getBody()), AAA.class);
            Optional.ofNullable(sendDetails).ifPresent(item->item.forEach(System.out::println));
        }
    
    
        @RabbitListener(queues = BaseDict.DELAY_QUEUE_NAME_M, concurrency = "10-20")
        public void receiveQ(Message message, Channel channel) throws IOException {
            final XXX request = JSONObject.parseObject(message.getBody(), XXX.class);
            System.out.println("消费"+request.toString());
        }
    

    3.基于RabbitMQ延迟插件实现延迟消息的局限性

    我们在第一次使用这个延迟插件的时候做了一个压测,大约100W数据量的延迟会导致内存和Cpu使用量的急速上升,查了一些文档没搞明白后,去了官网看了下,发现其对此有以下解释,大致是讲目前这个延迟插件还不足以支持那么大的数据量,建议数据量不要太大
    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72

    因此我们如果想用好延迟插件,目前来说需要做一些额外的配合,尽量使其延时最近的数据,并且数据量维持到一个比较低的程度

    相关文章

      网友评论

          本文标题:基于RabbitMq的实现消息延时发送的优点以及其局限性;

          本文链接:https://www.haomeiwen.com/subject/nbitiltx.html