美文网首页
消费端ACK与重回队列

消费端ACK与重回队列

作者: 爱吃豆包 | 来源:发表于2023-03-22 08:58 被阅读0次

补充:

生产者和消息中心交互,就是往消息中心发消息!分为事物方式confirm方式 确认,如果没有这两个方式,就表示我生产者生产消息后,直接就往消息中心发送,就可以了,但我却不知道到底发送成功没有!

消费者和消息中心交互,就是消息中心给消息者消息消费!分为 自动ACK手动ACKNACK(不确认消息,将会重返队列的队首)!说白了就是我消费者要保证这个消息正确的消费!

自动ACK: 就表示我消费者拿到消息后会立马给消息中心说,我收到了!然后在去根据消息处理 我们的业务。
手动ACK: 就是,当我消息接收到消息后,不会马上和消息中心说,我收到了!而是等着我们业务处理完,我们在手动ACK告诉消息中心说,我收到了!
NACK (不确认消息): 就是我处理业务消息的时候,出现了业务异常,导致这个消息处理失败,那么也就表示这个消息消费不成功,但是既然不成功那么我要让它回到消息队列中,那么我就 NACK ,这个消息就会自动回到队首!

消费者 和 生产者 在交互上面,本质是和消息中心交互的!而不是 消费者 和 生产者直接交互,中间有一层MQ消息中心!

=========================================================

ACK 确认消息
NACK 不确认消息

消费端的自动ACK,手工 ACK(确认) 和 NACK(不确认, 处理失败,重新投入),

使用场景:

消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!

当我们的业务异常出现,我们可以用最大努力补偿,捕获到业务异常我直接 NACK 不确认消息,让它自动进行重发,一直进行个两三次 NACK 后,如果到了第四次还是失败的,那我们就直接写入日志里面,后面进行人工干预补偿!(比如人工给这个用户增加积分)!

NACK 后这个消息会放到消息中心的队首(也可能是队尾,忘记了,好像是队尾),那么下一次消息,就又会这条消息!

如果由于服务器宕机等严重问题,那我们就需要手工ACK保障消费端消费成功!

比如说,我这个消息消费了一半了,服务器突然挂了,我们MQ中心,就接收不到 ACK 也接收不到 NACK ,那么我们这个消息就会进行重发!当消息消费到一半的时候,算失败的,因为我MQ消息中心没有接受到真正的确认应答!

也就是说,我消费者接受到消息后,自动ACK,给MQ消息中心发消息说,我收到了!但是如果我自动ACK的话,消费消息的时候失败了,那么我这个消息也就没有了,因为我消费端一拿到消息就马上给消息中心说我收到了!

所以这个时候我需要的时候手工ACK,当消费者收到消息的时候,不自动给消息中心发消息说我收到了,而是等着消费端把消息处理成了,在给消息中心发ACK确认,这是手工ACK

消费端的重回队列

消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker(消息中心)!
一般我们在实际使用中,都会关闭重回队列,也就是设置为False

生产者

 package com.example.rabbitmqapi.ack;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * 生产者
 *
 *      在消费端的 ACK 和 重回队列
 *
 * @author weiximei
 */
public class Producer {

    public static void main(String[] args) throws Exception {
        // 1. 创建ConnectionFactory, 并设置属性
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.72.138");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 创建连接
        Connection connection = factory.newConnection();

        // 3. 创建channel
        Channel channel = connection.createChannel();


        String exchangeName = "test_ack_exchange";
        String routingKey = "ack.saye";

        // 发送消息
        String msg = "自定义消费者, 消息发送 : Hello, weiximei";
        for (int i = 0; i < 5; i++) {

            // 携带额外的参数
            Map<String,Object> headerMap = new HashMap<>();
            headerMap.put("num", i);

            // 添加额外的属性
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2) // 是否持久化 1 不持久化,2 持久化
                    .contentEncoding("UTF-8")  // 设置字符集
                    .headers(headerMap).build();

            /**
             * mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
             */
            // exchange 表示交换机, 不设置交换机就输入空字符串, 就表示走第一个默认的交换机(AMQP default), 也就是说 routingKet 会和交换机绑定一起
            // routingKet 表示key键,发送到哪一个队列
            // mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
            // props 表示消息的其他属性 (BasicProperties)
            // body 表示消息内容
            channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
        }

        // 关闭连接
        channel.close();
        connection.close();

    }

}



消费者

 package com.example.rabbitmqapi.ack;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消费者
 *
 *      消费端 ACK 和 重回队列
 *
 * @author weiximei
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1. 创建连接工厂并设置属性
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.72.138");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        // 2. 创建连接
        Connection connection = factory.newConnection();

        // 3. 创建channel
        Channel channel = connection.createChannel();

        // 4. 声明Exchange
        String exchangeName = "test_ack_exchange";
        String exchangeType = "topic";
        String routingKey = "ack.*";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);

        // 5. 声明消息队列
        String queueName = "test_ack_queue";
        channel.queueDeclare(queueName, true, false, false, null);

        // 6. 绑定队列和Exchange
        channel.queueBind(queueName, exchangeName, routingKey);


        // 7. 设置消费者为自定义的消费者, 手工签收ACK,则必须 autoAck=false
        channel.basicConsume(queueName, false, new MyConsumer(channel));

    }

}



自定义消费

 package com.example.rabbitmqapi.ack;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

/**
 * 自定义消费者
 *
 * @author weiximei
 */
public class MyConsumer extends DefaultConsumer {

    private Channel channel;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    // 第一参数:consumerTag 同一个会话, consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1,可以做此消息处理通道的名字。
    //      因此 envelope.deliveryTag 可以用来回传告诉 rabbitmq 这个消息处理成功 清除此消息(basicAck方法)。
    // 第二个参数: envelope 表示消息类型信息
    // 第三个参数:properties 表示消息路由头的其他属性等
    // 第三个参数: body 消息内容
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("-------------自定义消费者------------");
        System.out.println("consumerTag : " + consumerTag);
        System.out.println("envelope : " + envelope);
        System.out.println("properties : " + properties);
        System.out.println("body : " + new String(body));

        // 获取消息的额外参数
        Integer num = (Integer) properties.getHeaders().get("num");
        // 比如我让这个的额外参数等于 0 的时候,就进行重回队列
        if (num == 0) {
            // deliveryTag 参数是消息唯一标识
            // multiple 是否是批量的,一般为false
            // requeue 是否重回队列, true 是,false 不是
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {

            /**
             * 手工签收, 第二个参数表示是否批量签收
             */
            // envelope.getDeliveryTag() 是消息的唯一标识
            channel.basicAck(envelope.getDeliveryTag(), false);
        }

    }
}


相关文章

网友评论

      本文标题:消费端ACK与重回队列

      本文链接:https://www.haomeiwen.com/subject/yjcokdtx.html