美文网首页
SpringCloud之Stream-7.延迟消息

SpringCloud之Stream-7.延迟消息

作者: 那钱有着落吗 | 来源:发表于2021-10-29 14:26 被阅读0次

    首先先讲一个例子:

    延迟消息概念

    延迟消息很好理解,就是一种不会立即被消费,而是延迟到未来某个时间点才能被消费的消息类型

    阿里新零售业务库存发布

    在电商场景里商品库存是一个核心数据,上下游很多链路都围绕库存做足了文章。在我们的业务有两个和库存紧密联系的业务名词

    l 今日/明日库存 今日库存就是当日可供售卖的库存量,同理,明日库存就是下一日的可用库存量

    l 库存计划 运营和采购要对商品的上架数量做出评估,然后整理出未来几天的库存计划表,这个计划表决定了未来每日新增库存的数量

    从上面两个名词,我们不难看出来“今日”与“明日”的界限是个关键事件点,也就是说,库存计划需要在每日的0点准时生效,为第二日的商品发布新的库存数量。这个场景天然适合使用延迟消息来实现,我们当时的流程设计是这样的。

    image.png

    l 库存中心 由运营添加库存计划到库存中心,后台服务会将库存发布计划作为延迟消息发送到消息组件。至于要延迟多长时间生效,则是用库存生效时间减去当前时间计算出来的。比如明日库存计划的生效时间是0点减去当前时间

    l 消息中间件 消息中间件在接收到库存发布消息后,根据其中设置的延迟生效时间向消息队列中添加新消息

    l 商品服务 所有商品服务作为一个消费者分组,订阅了消息组件中的库存发布消息,一旦有新消息到来,这条消息就会被该分组内的某个服务消费

    开启延迟消息

    打开RabbitMQ官方的插件下载页面
    https://www.rabbitmq.com/community-plugins.html
    网址,从中找到
    rabbitmq_delayed_message_exchange
    这个插件。

    image.png

    上面就是延迟插件的下载列表,不同版本的RabbitMQ对应不同版本的插件,同学们先看一下你的安装版本,然后选择对应的插件下载。比如我的RabbitMQ版本是3.7.15,所以选择3.7.x版本的插件。

    安装插件

    下载完成后将插件解压,把解压后的文件copy到RabbitMQ安装目录下的plugins文件夹。然后运行
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    这个命令安装插件。安装好后你会看到日志中打印出了这个插件的名称

    rabbitmq_delayed_message_exchange
    The following plugins have been configured:
    rabbitmq_delayed_message_exchange
    rabbitmq_management
    rabbitmq_management_agent
    rabbitmq_shovel
    rabbitmq_shovel_management
    rabbitmq_web_dispatch
    Applying plugin configuration to rabbit@LM-SHC-1650...

    重启服务

    本地直接执行
    rabbitmqctl stop
    命令关闭RabbitMQ(前提是安装路径已经添加到系统变量中,否则要先进入安装目录后再执行该命令)。待完全关闭之后,再执行
    rabbitmq-server
    命令启动服务。
    对Mac系统来说,添加RabbitMQ到系统变量只要修改
    ~/.bash_profile
    添加这行就可以了:
    export PATH={PATH}:RABBIT_HOME/sbin
    。对Windows同理,需要你向系统变量的PATH属性中添加RabbitMQ的路径信息。
    到这一步,准备工作就大功告成了。

    实战

    image.png

    创建一个新的topic类

    
    public interface DelayedTopic {
    
        String INPUT = "delayed-consumer";
    
        String OUTPUT = "delayed-producer";
    
        @Input(INPUT)
        SubscribableChannel input();
    
        @Output(OUTPUT)
        MessageChannel output();
    }
    
    @PostMapping("sendDelayedMsg")
        public void sendDelayedMsg(@RequestParam(value="body")String body,
                                   @RequestParam(value="seconds")Integer seconds){
            MessageBean msg = new MessageBean();
            msg.setPayload(body);
    
            log.info("ready to send delayed message");
    
            delayedTopicProducer.output().send(
                    MessageBuilder.withPayload(msg)
                            .setHeader("x-delay",1000*seconds).build());
        }
    

    监听器也要添加一下

    
    @Slf4j
    @EnableBinding(
            value = {
                    GroupTopic.class,DelayedTopic.class
            }
    )
    public class StreamConsumer {
    
        @StreamListener(GroupTopic.INPUT)
        public void consumeGroupMessage(Object payload){
            log.info("Group message consumed successfully,payload={}",payload);
        }
    
        @StreamListener(DelayedTopic.INPUT)
        public void consumeDelayedMessage(MessageBean messageBean){
            log.info("Delayed message consumed successfully,payload={}",messageBean.getPayload());
        }
    
    
    }
    
    

    添加一下延迟消息的配置,最后的是打开延迟的exchange

    #延迟消息配置
    spring.cloud.stream.bindings.delayed-consumer.destination=delayed-topic
    spring.cloud.stream.bindings.delayed-producer.destination=delayed-topic
    spring.cloud.stream.rabbit.bindings.delayed-producer.producer.delayed-exchange=true
    

    启动之后可以去rabbitmq的界面看一下:


    image.png

    图中的DM意思就是dalayed_message,如果有这个标签就代表是延迟消息

    发送接口做测试:


    image.png image.png

    相关文章

      网友评论

          本文标题:SpringCloud之Stream-7.延迟消息

          本文链接:https://www.haomeiwen.com/subject/eprvaltx.html