美文网首页Docker容器Amazing ArchDocker容器技术
docker环境下RabbitMQ实现延迟队列(使用delay插

docker环境下RabbitMQ实现延迟队列(使用delay插

作者: 炒面Z | 来源:发表于2019-07-25 20:19 被阅读3次

    需求场景: 处理一个超时订单

    以下解决方案是使用的rabbitMq是docker环境下部署
    需要使用到rabbitMq的rabbitmq_delayed_message_exchange插件
    另外( RocketMq自带18个级别的超时配置)
    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 有兴趣的朋友们可以去试试

    • 在docker环境下的rabbitMq中安装延迟插件

    1. 进入docker容器内 docker exec  -t rabbit  bash
    2. rabbitmq-plugins list 命令查看已安装插件
    3. 在插件网址找到延迟插件的下载地址 http://www.rabbitmq.com/community-plugins.html 
    4. exit 退出容器到宿主机中,下载插件: wget  https://dl.bintray.com/rabbitmq/community-plugins/3.7.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
    5. 解压 unzip XXX.zip -d . 
    6. 拷贝至docker容器内: docker cp xxx.xz rabbit:/plugins
    7. 再次进入docker容器内: 进入docker容器内 docker exec  -t rabbit  bash
    8. 执行命令让插件生效: 启动延时插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    • java项目中的应用

    image.png
    • DelayedConfig.java
    @Configuration
    public class DelayedConfig {
        final static String QUEUE_NAME = "delayed.live.queue";
        final static String EXCHANGE_NAME = "delayed.live.exchange";
    
        @Bean
        public Queue queue() {
            return new Queue(DelayedConfig.QUEUE_NAME);
        }
    
        // 配置默认的交换机
        @Bean
        CustomExchange customExchange() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            //参数二为类型:必须是x-delayed-message
            return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);
        }
    
        // 绑定队列到交换器
        @Bean
        Binding binding(Queue queue, CustomExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();
        }
    }
    
    • DelayedReceiver
    @Slf4j
    @Component
    @RabbitListener(queues = DelayedConfig.QUEUE_NAME)
    public class DelayedReceiver {
        @RabbitHandler
        public void process(String msg) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            log.info("接收时间:" + sdf.format(new Date()));
            log.info("消息内容:" + msg);
        }
    }
    
    • DelayedSender
    @Slf4j
    @Component
    public class DelayedSender {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send(String msg,Integer delaySeconds) {
            SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            log.info("发送时间:" + sf.format(new Date()));
    
            rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setHeader("x-delay", delaySeconds * 1000);
                    return message;
                }
            });
        }
    }
    
    • 测试控制器

    @Slf4j
    @RequestMapping("swapi")
    @RestController
    @Api(value = "SwTimetableLogApi", description = "mq测试", tags = {"mq测试"})
    public class MqController {
    
        @Autowired
        private DelayedSender sender;
    
        @NoLogin
        @ApiOperation(value = "延迟队列测试")
        @PostMapping("/mq/{message}/{delay}")
        public Result messageWithMQ(@PathVariable(value = "message") String message,
                                    @PathVariable(value = "delay") Integer delay) {
            log.info("Send: " + message);
            sender.send(message, delay);
            return Result.ok();
        }
    
    }
    

    相关文章

      网友评论

        本文标题:docker环境下RabbitMQ实现延迟队列(使用delay插

        本文链接:https://www.haomeiwen.com/subject/tfukrctx.html