美文网首页微服务个人学习
RabbitMQ可靠消息和死信队列

RabbitMQ可靠消息和死信队列

作者: CJ21 | 来源:发表于2021-05-01 23:42 被阅读0次

一、消息确认机制

1.1 概念

保证消息不丢失,可靠抵达,可以使用的方式如下

  • 使用事务消息:通过事务保证消息不丢失,但是性能下降250倍
  • 确认机制:publisher: confirmCallback 确认模式publisher: returnCallback 为投递到queue退回模式consumer: ack机制 image.png

1.2. 具体实现

2.1 从publisher到exchange

原理:消息只要被broker接受就会执行confirmCallback,如果是cluster模式,需要所有的broker接受到才会调用confirmCallback。

yml配置

spring.rabbitmq.publisher-confirm-type=correlated

收信后回调操作:

    /**
     * 设置消息接受的回调方法
     */
    @PostConstruct
    public void initRabbitTemplate() {

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 设置消息抵达exchange的回调方法
             * @param correlationData 当前消息的唯一关联数据(消息的唯一id)
             * @param ack 消息是否成功收到
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm...correlationData[" + correlationData + "]==>ack[" + ack + "]");
            }
        });

    }

CorrelationData:用来表示当前消息的唯一性,可以在发送消息时进行设置 rabbitTemplate.convertAndSend("test-exchange", "test.java", car, new CorrelationData(UUID.randomUUID().toString()));

2.2 从exchange到queue

原理:我们要保证exchange中的消息要投递到目标queue中,需要开启return退回模式。

yml配置

spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

exchange发送到queue失败后回调操作:

    /**
     * 设置消息接受的回调方法
     */
    @PostConstruct
    public void initRabbitTemplate() {

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息没有投递给指定的队列,就触发这个失败回调
             * @param message 投递失败的消息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange 这个消息发送给那个交换机
             * @param routingKey 消息使用的是哪个路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("Fail Message[" + message + "]-->replyCode[" + replyCode + "]-->replyText[" + replyText + "]-->exchange[" + exchange + "]-->routingKey[" + routingKey + "]");
            }
        });

    }
  • 可以在回调方法中将发送失败的信息存储到MySQL中

2.3 从queue到consumer

原理:
消费者获取到消息,成功处理,可以回复Ack给Broker

  • basic.ack用于肯定回复;broker将移除此消息
  • basic.nack用于否定回复;可以指定broker是否丢弃消息,可以批量
  • basic.reject用于否定回复;同上,但不能批量

默认情况下Broker的消息发送给queue后会自动Ack,删除该消息。但是如果无法确定此消息是否被处理完成或成功处理。我们可以手动开启ack模式:

  • 消息处理成功,ack(),接受下一个消息,broker删除该消息
  • 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错后ack
  • 消息一直没有调用ack/nack方法,broker认为此消息被其他consumer处理,不会投递给别人,此时consumer断开,消息不会被broker删除,会投递给别人。

配置

spring.rabbitmq.listener.simple.acknowledge-mode=manual

二、延时队列和死信队列

2.1 延时队列

消息的TTL(Time To Live)
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。

  • 对队列设置TTL:队列没有消费者连接的过期时间。
  • 对消息设置TTL:超时后该消息就是死信。

如果队列和消息都设置了TTL,那么会取最小的。通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者效果相同。

2.2 死信队列

死信情况包括如下三种:

  • 消息被consumer拒收(unack或reject)且requeue是false;
  • 消息TTL超时未消费;
  • 队列长度满了,排在前面的消息会被丢弃或发送到死信exchange中。
  • 超过最大重试次数

DLE(Dead Letter Exchange)
死信exchange是一种普通的exchange,只是所有的死信都会自动从队列中发送到该exchange中。

2.3 应用场景

场景:比如未支付订单,超时一定时间后,系统自动取消订单并释放占有的物品。

解决方案:

  • Spring的schedule定时或xxl中间件定时任务:消耗系统内存、增加数据库压力、存在较大的时间误差;
  • RabbitMQ的消息TTL和死信Exchange结合

如果订单超时时间为1小时,即为消息设置1小时的TTL时间且为队列设置DLE和死信路由键,消息超时过期后发送到DLE,根据死信路由键路由到死信队列中,从死信队列中获得的消息就是超时订单。

实现一:
给队列中的所有消息设定TTL时间,到达队列的时间时开始计时。

image.png
        // 创建信息过期时间为1m的队列
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange"); // 指定死信交换机的名字
        arguments.put("x-dead-letter-routing-key", "order.release.order"); // 指定死信的路由键
        arguments.put("x-message-ttl", 10000); // TTL时间设为1m
        Queue delayQueue = new Queue("order.delay.queue", true, false, false, arguments);
        amqpAdmin.declareQueue(delayQueue);

实现二:
给发布者发送的每条消息设施TTL时间,缺点是前一个过期处理之后才会处理后续的过期数据,存在过期数据处理不及时的情况。

image.png
    @Test
    public void sendOrderCreate() {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("60000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        Order book = new Order(1, "book");
        rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", book, messagePostProcessor, new CorrelationData(UUID.randomUUID().toString()));
    }

注意点:如果创建队列成功后,代码中修改队列的属性,是不会覆盖原有属性的,需要删除后再次创建队列。

2.4 实现

2.4.1 流程

①定义交换机(普通信息和死信共用)、延时队列(参数指定死信交换器、死信路由和延时时间)和死信队列。
②绑定交换器和延时队列与死信队列。
③发送消息到交换机,路由到延时队列。
④不消费延时队列信息,信息过期后发送到死信交换器,交换器通过死信路由发送到死信队列。
⑤消费死信队列获得死信信息。

2.4.2 代码

依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置类

@Configuration
public class RabbitMqConfig {

    @Resource
    private AmqpAdmin amqpAdmin;

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 注入并使用指定的消息转换器(将序列化传输转变为json传输)
     *
     * @return
     */
//    @Bean
//    public Jackson2JsonMessageConverter converter(){
//        return new Jackson2JsonMessageConverter();
//    }

    /**
     * 设置消息接受的回调方法
     */
    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 设置消息抵达exchange的回调方法
             * @param correlationData 当前消息的唯一关联数据(消息的唯一id)
             * @param ack 消息是否成功收到
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm...correlationData[" + correlationData + "]==>ack[" + ack + "]");
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息没有投递给指定的队列,就触发这个失败回调
             * @param message 投递失败的消息
             * @param replyCode 回复的状态码
             * @param replyText 回复的文本内容
             * @param exchange 这个消息发送给那个交换机
             * @param routingKey 消息使用的是哪个路由键
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("Fail Message[" + message + "]-->replyCode[" + replyCode + "]-->replyText[" + replyText + "]-->exchange[" + exchange + "]-->routingKey[" + routingKey + "]");
            }
        });
    }


    /**
     * 定义交换机,参数说明
     * 1. name 交换机名称
     * 2. durable 是否持久化,如果持久化,mq重启后交换机还在
     * 3. autoDelete 自动删除,交换机没有绑定队列则删除,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
     * 4. arguments 参数,可以设置一个队列的扩展参数,比如设置存活时间
     */
    @PostConstruct
    public void createExchange() {
        // 创建交换机(同时作为订单信息交换机和死信交换机)
        DirectExchange directExchange = new DirectExchange("order-event-exchange", true, false);
        amqpAdmin.declareExchange(directExchange);
    }

    /**
     * 定义队列,参数说明
     * 1. queue 队列名称
     * 2. durable 是否持久化,如果持久化,mq重启后队列还在
     * 3. exclusive 是否独占连接,队列只允许在该连接中访问,如果连接关闭队列自动删除(如果将此参数设置true可用于临时队列的创建)
     * 4. autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
     * 5. arguments 参数,可以设置一个队列的扩展参数,比如设置存活时间
     */
    @PostConstruct
    public void createQueue() {
        // 创建信息过期时间为1m的队列
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "order-event-exchange"); // 指定死信交换机的名字
        arguments.put("x-dead-letter-routing-key", "order.release.order"); // 指定死信的路由键
        arguments.put("x-message-ttl", 10000); // TTL时间设为1m
        Queue delayQueue = new Queue("order.delay.queue", true, false, false, arguments);
        amqpAdmin.declareQueue(delayQueue);

        // 创建死信队列
        Queue releaseQueue = new Queue("order.release.order.queue", true, false, false);
        amqpAdmin.declareQueue(releaseQueue);
    }

    /**
     * 定义队列,参数说明
     * 1. destination 绑定的队列或者交换机的名字
     * 2. destinationType 需要绑定的类型
     * 3. exchange 交换机的名字
     * 4. routingKey 路由键
     * 5. arguments 参数,可以设置一个队列的扩展参数
     */
    @PostConstruct
    public void createBinding() {
        // 交换机order-event-exchange与订单创建队列order.delay.queue绑定,路由键为order.create.order
        Binding createBinding = new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
        amqpAdmin.declareBinding(createBinding);

        // 交换机order-event-exchange与死信队列order.release.order.queue绑定,路由键为order.release.order
        Binding releaseBinding = new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);
        amqpAdmin.declareBinding(releaseBinding);
    }

}

监听死信队列

@Service
public class RabbitListenerService {

    @RabbitListener(queues = "order.release.order.queue")
    public void consumer1(Message message, Order order, Channel channel) throws IOException {
        System.out.println(order);

        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag,false);
    }

}

发送信息

    @Test
    public void sendOrderCreate() {
        Order book = new Order(1, "book");
        rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order",book, new CorrelationData(UUID.randomUUID().toString()));
    }

三、 硬件

image.png

3.1 内存

默认如果rabbitmq使用超过物理内存的40%,会报警并阻塞所有队列。
可以通过配置文件或输入命令来改变默认的配置
命令方式:

  1. 如果使用内存超过90MB则报警:rabbitmqctl set_vm_memory_high_watermark absolute 90MB
  2. 如果使用内存超过物理内存的40%则报警:rabbitmqctl set_vm_memory_high_watermark 0.4

3.2 磁盘

磁盘剩余空间低于阈值时,同样会阻塞生产者,避免因非持久化的消息持续换页导致服务器磁盘耗尽而崩溃。

  1. 如果磁盘空间剩余小于100GB则报警:rabbitmqctl set_disk_free_limit 100GB
  2. 如果磁盘空间剩余小于内存的1.5倍,则报警:rabbitmqctl set_disk_free_limit memory_limit 1.5

3.3 内存换页

在某个broker节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入到磁盘中,其中持久化的消息本身就在磁盘胡中有一个副本,所以在转移过程中持久化的消息会先从内存中清除掉。

默认情况下,内存到达的阈值是50%时进行换页,即在默认情况下物理内存使用超过0.4*0.5=0.2时,进行换页。
可以通过设置vm_memory_high_watermark_paging_ratio进行调整。

官网配置

四、附

常用配置

spring:
  rabbitmq:
    host: 192.168.32.207
    port: 5672
    # addresses: 192.168.32.207:5672,... # 配置集群的地址
    username: guest
    password: guest
    virtual-host: /transaction_demo
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
    listener:
      simple:
        acknowledge-mode: manual
        #retry:
          #enabled: true # 开启重试
          #max-attempts: 10 # 最大重试次数
          #initial-interval: 2000m # 重试间隔时间

消费者retry重试消费消息后要放入死信队列,就不能将acknowledge-mode设为manual,否则消息一直是unack状态。

完整配置

spring:
  rabbitmq:
    host: 192.168.32.207
    # addresses: # 配置集群的地址
    port: 5672
    username: guest
    password: guest
    virtual-host: /transaction_demo
    publisher-confirm-type: correlated
    publisher-returns: true
    requested-heartbeat: # 指定心态超时,单位秒,0为不指定:默认60s
    connection-timeout: # 连接超时,单位毫秒,0表示无穷大,不超时
    cache:
      channel:
        size: # 缓存中保存的channel数量
        checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创新一个新channel
      connection:
        size: # 缓存的链接数,只有是CONNECTION模式时生效
        mode: # 连接工厂缓存模式:CHANNEL和CONNECTION
    template:
      mandatory: true
      receive-timeout: # receive()操作的超时时间
      reply-timeout: # sendAndReceive()操作的超时时间
      retry:
        enabled: # 发送重试是否可用
        max-attempts: #最大重试次数
        initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
        multiplier: # 应用于上一重试间隔的乘数
        max-interval: # 最大重试时间间隔
    listener:
      simple:
        acknowledge-mode: manual
        retry:
          enabled: true # 开启重试
          max-attempts: 10 # 最大重试次数
          initial-interval: 2000ms # 重试间隔时间
          multiplier: # 应用于上一重试间隔的乘数
          max-interval: # 最大重试时间间隔
          stateless: # 重试是有状态or无状态
        auto-startup: # 是否启动时自动启动容器
        concurrency: # 最小的消费者数量
        max-concurrency: # 最大的消费者数量
        prefetch: # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量
        transaction-size: # 指定一个事务处理的消息数量,最好小于等于prefetch的数量
        default-requeue-rejected: # 决定被拒绝的消息是否重新入队,默认是true(与参数acknowledge-mode有关系)
        idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒

面试题

RabbitMQ为什么需要信道,为什么不是TCP直接通信:

  1. TCP的创建和销毁开销大,创建要三次握手,销毁要四次分手。每个线程都开一个TCP连接,造成底层操作系统处理繁忙;
  2. 信道的原理是一条线程一个信道,多条线程多条信道同用一条TCP连接,一条TCP连接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能瓶颈。

五、RabbitMQ集群搭建

......待续

相关文章

网友评论

    本文标题:RabbitMQ可靠消息和死信队列

    本文链接:https://www.haomeiwen.com/subject/yfefhktx.html