美文网首页
spring boot 集成rabbitMQ(二)延迟队列

spring boot 集成rabbitMQ(二)延迟队列

作者: 周六不算加班 | 来源:发表于2018-09-06 17:16 被阅读37次

1、引入rabbitMQ相关的jar包

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

2、配置文件设置

spring.rabbitmq.host=xx.xx.xx.xx
spring.rabbitmq.port=xxxx
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3、RabbitMq配置类

@Configuration
public class RabbitConfig {

/**
 * 方法rabbitAdmin的功能描述:动态声明queue、exchange、routing
 * @param connectionFactory
 * @return
 */
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    //声明死信队列(Fanout类型的exchange)
    Queue deadQueue = new Queue("dead_queue");
    // 死信队列交换机
    FanoutExchange deadExchange = new FanoutExchange("exchange_dead_queue");
    rabbitAdmin.declareQueue(deadQueue);
    rabbitAdmin.declareExchange(deadExchange);
    rabbitAdmin.declareBinding(BindingBuilder.bind(deadQueue).to(deadExchange));

    // 正常队列Direct交换机
    DirectExchange exchange = new DirectExchange("direct_exchange");
    Queue couponQueue = queue("direc_queue");
    //声明消息队列(Direct类型的exchange)
    rabbitAdmin.declareQueue(couponQueue);
    rabbitAdmin.declareExchange(exchange);
    //绑定交换机路由
    rabbitAdmin.declareBinding(BindingBuilder.bind(couponQueue).to(exchange).with("routing"));
    return rabbitAdmin;
}

public Queue queue(String name) {
    Map<String, Object> args = new HashMap<>();
    // 设置死信队列
    args.put("x-dead-letter-exchange", "exchange_dead_queue");
    args.put("x-dead-letter-routing-key", "dead_routing");
    // 设置消息的过期时间, 单位是毫秒
    args.put("x-message-ttl", 5000);

    // 是否持久化
    boolean durable = true;
    // 仅创建者可以使用的私有队列,断开后自动删除
    boolean exclusive = false;
    // 当所有消费客户端连接断开后,是否自动删除队列
    boolean autoDelete = false;
    return new Queue(name, durable, exclusive, autoDelete, args);
}

}

4、发送端

@RestController
@RequestMapping("/mq")
public class RabbitmqController {
    @Autowired
    private RabbitSender rabbitSender;

    @RequestMapping("/send")
    public Object sendMsg(String name) {
        SendMessage sendMessage = new SendMessage();
        sendMessage.setId(1);
        sendMessage.setAge(20);
        sendMessage.setName(name);
        JSONObject jsonObject = (JSONObject)JSONObject.toJSON(sendMessage);
        String content = jsonObject.toJSONString();
         //绑定交换机和路由,把对象通过json转成string传递
        rabbitSender.sendMessage("direct_exchange", "routing", content);
        return "发送成功";
    }
}

5、接收端,监听死信队列实现延迟的效果

@RabbitListener(queues = "direc_queue")
 public void process(String sendMessage, Channel channel, Message message) throws Exception {
    try {
        // 参数校验
        Assert.notNull(sendMessage, "sendMessage 消息体不能为NULL");

        // TODO 处理消息

        // 确认消息已经消费成功
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {

        try {
            // TODO 保存消息到数据库

            // 确认消息已经消费成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception dbe) {

            // 确认消息将消息放到死信队列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

相关文章

网友评论

      本文标题:spring boot 集成rabbitMQ(二)延迟队列

      本文链接:https://www.haomeiwen.com/subject/gfixgftx.html