美文网首页消息中间件踩坑
消息中间件踩坑之旅(三)——RabbitMq延时任务处理

消息中间件踩坑之旅(三)——RabbitMq延时任务处理

作者: 简单DI年华 | 来源:发表于2019-01-26 21:35 被阅读0次

    场景描述

    • 在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行异常处理。

    • 用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户的指令发送到消息队列里,进行延时处理。

    如何实现?

    因为博主知识浅薄,所以这里只提供两种简单的解决方案
    方案一:使用延迟队列
    用到知识:

    死信队列 http://www.rabbitmq.com/dlx.html

    • 当该队列中的消息被拒绝、过期、或者队列达到最大长度。消息就会变成死信,然后被重新发送到另一个转换器。
    • 通过x-dead-letter-exchange设置转发的转换器
    • 可以携带routing-key ---- x-dead-letter-routing-key

    过期时间TTL http://www.rabbitmq.com/ttl.html

    • 故名思意就是设置一个时间,超过这个时间,消息就过期了。
    • 其实这里有很多种设置方案,这里采用最合适的设置 message-ttl 值,这是在队列属性上设置的
    image

    一下代码基于SpringBoot

    @Bean////订单首次存在的死信队列
    public Queue TradeQueue() {
        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","DLXExchange");  //设置重新发送的转换器名
        map.put("x-message-ttl",6000);//毫秒为单位          //过期时间
        return new Queue("TradeQueue",true,false,false,map);
    }
    
    @Bean//订单在死信队列过期后去的队列
    public Queue DLXQueue() {
        Map<String,Object> map = new HashMap<>();
        return new Queue("DLXQueue",true,false,false,map);
    }
    
    //两个转换器声明
    @Bean
    DirectExchange TradeExchange() {
        DirectExchange exchange = new DirectExchange("TradeExchange", true, false);
        return exchange;
    }
    
    @Bean
    FanoutExchange DLXExchange() {
        return new FanoutExchange("DLXExchange",true, false);
    }
    
    @Bean//订单首次存在的死信队列交换器绑定
    Binding bindingExchangeMessage1(Queue TradeQueue, DirectExchange TradeExchange) {
        return BindingBuilder.bind(TradeQueue).to(TradeExchange).with("trade");
    }
    
    @Bean//订单首次存在的死信队列出来后经过的交换器绑定
    Binding bindingExchangeMessage2(Queue DLXQueue, FanoutExchange DLXExchange) {
        return BindingBuilder.bind(DLXQueue).to(DLXExchange);
    }
    
    @Autowired
    private AmqpTemplate rabbitTemplate;
    
    public void send() {
        //测试发送
        //订单号
        int tradeId = 9518;
        //发送单号去死信队列做延时操作
       rabbitTemplate.convertAndSend("TradeExchange","trade",tradeId);
        System.out.println(new Date()+"---成功发送订单号到死信队列");
    
    }
    
    @RabbitListener(queues = "DLXQueue")    //接收来到DLXQueue的消息,及时TradeQueue过期后传来的消息,然后进行处理
    public void process2(int str) {
        System.out.println("Receiver  : " +str);
        System.out.println(new Date()+"---成功接收已经过期订单号");
        System.out.println("处理操作。。。。。");
    
    }
    

    运行结果 时间相差和之前设置的一样,从而达到效果

          Mon Jan 21 00:51:03 CST 2019---成功发送订单号到死信队列
          Receiver  : 9518
          Mon Jan 21 00:51:09 CST 2019---成功接收已经过期订单号
          处理操作。。。。。
    

    剩下一种方案 后续更新 是采用

    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    ///。。。。。
     message.getMessageProperties().setDelay(3000);   // 毫秒为单位,指定此消息的延时时长
    

    相关文章

      网友评论

        本文标题:消息中间件踩坑之旅(三)——RabbitMq延时任务处理

        本文链接:https://www.haomeiwen.com/subject/zudwjqtx.html