美文网首页
消费端ACK与重回队列

消费端ACK与重回队列

作者: 爱吃豆包 | 来源:发表于2023-03-22 08:58 被阅读0次

    补充:

    生产者和消息中心交互,就是往消息中心发消息!分为事物方式confirm方式 确认,如果没有这两个方式,就表示我生产者生产消息后,直接就往消息中心发送,就可以了,但我却不知道到底发送成功没有!

    消费者和消息中心交互,就是消息中心给消息者消息消费!分为 自动ACK手动ACKNACK(不确认消息,将会重返队列的队首)!说白了就是我消费者要保证这个消息正确的消费!

    自动ACK: 就表示我消费者拿到消息后会立马给消息中心说,我收到了!然后在去根据消息处理 我们的业务。
    手动ACK: 就是,当我消息接收到消息后,不会马上和消息中心说,我收到了!而是等着我们业务处理完,我们在手动ACK告诉消息中心说,我收到了!
    NACK (不确认消息): 就是我处理业务消息的时候,出现了业务异常,导致这个消息处理失败,那么也就表示这个消息消费不成功,但是既然不成功那么我要让它回到消息队列中,那么我就 NACK ,这个消息就会自动回到队首!

    消费者 和 生产者 在交互上面,本质是和消息中心交互的!而不是 消费者 和 生产者直接交互,中间有一层MQ消息中心!

    =========================================================

    ACK 确认消息
    NACK 不确认消息

    消费端的自动ACK,手工 ACK(确认) 和 NACK(不确认, 处理失败,重新投入),

    使用场景:

    消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!

    当我们的业务异常出现,我们可以用最大努力补偿,捕获到业务异常我直接 NACK 不确认消息,让它自动进行重发,一直进行个两三次 NACK 后,如果到了第四次还是失败的,那我们就直接写入日志里面,后面进行人工干预补偿!(比如人工给这个用户增加积分)!

    NACK 后这个消息会放到消息中心的队首(也可能是队尾,忘记了,好像是队尾),那么下一次消息,就又会这条消息!

    如果由于服务器宕机等严重问题,那我们就需要手工ACK保障消费端消费成功!

    比如说,我这个消息消费了一半了,服务器突然挂了,我们MQ中心,就接收不到 ACK 也接收不到 NACK ,那么我们这个消息就会进行重发!当消息消费到一半的时候,算失败的,因为我MQ消息中心没有接受到真正的确认应答!

    也就是说,我消费者接受到消息后,自动ACK,给MQ消息中心发消息说,我收到了!但是如果我自动ACK的话,消费消息的时候失败了,那么我这个消息也就没有了,因为我消费端一拿到消息就马上给消息中心说我收到了!

    所以这个时候我需要的时候手工ACK,当消费者收到消息的时候,不自动给消息中心发消息说我收到了,而是等着消费端把消息处理成了,在给消息中心发ACK确认,这是手工ACK

    消费端的重回队列

    消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker(消息中心)!
    一般我们在实际使用中,都会关闭重回队列,也就是设置为False

    生产者

     package com.example.rabbitmqapi.ack;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 生产者
     *
     *      在消费端的 ACK 和 重回队列
     *
     * @author weiximei
     */
    public class Producer {
    
        public static void main(String[] args) throws Exception {
            // 1. 创建ConnectionFactory, 并设置属性
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.72.138");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            // 2. 创建连接
            Connection connection = factory.newConnection();
    
            // 3. 创建channel
            Channel channel = connection.createChannel();
    
    
            String exchangeName = "test_ack_exchange";
            String routingKey = "ack.saye";
    
            // 发送消息
            String msg = "自定义消费者, 消息发送 : Hello, weiximei";
            for (int i = 0; i < 5; i++) {
    
                // 携带额外的参数
                Map<String,Object> headerMap = new HashMap<>();
                headerMap.put("num", i);
    
                // 添加额外的属性
                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                        .deliveryMode(2) // 是否持久化 1 不持久化,2 持久化
                        .contentEncoding("UTF-8")  // 设置字符集
                        .headers(headerMap).build();
    
                /**
                 * mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
                 */
                // exchange 表示交换机, 不设置交换机就输入空字符串, 就表示走第一个默认的交换机(AMQP default), 也就是说 routingKet 会和交换机绑定一起
                // routingKet 表示key键,发送到哪一个队列
                // mandatory 如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!
                // props 表示消息的其他属性 (BasicProperties)
                // body 表示消息内容
                channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
            }
    
            // 关闭连接
            channel.close();
            connection.close();
    
        }
    
    }
    
    
    
    

    消费者

     package com.example.rabbitmqapi.ack;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 消费者
     *
     *      消费端 ACK 和 重回队列
     *
     * @author weiximei
     */
    public class Consumer {
    
        public static void main(String[] args) throws Exception {
            // 1. 创建连接工厂并设置属性
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.72.138");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            // 2. 创建连接
            Connection connection = factory.newConnection();
    
            // 3. 创建channel
            Channel channel = connection.createChannel();
    
            // 4. 声明Exchange
            String exchangeName = "test_ack_exchange";
            String exchangeType = "topic";
            String routingKey = "ack.*";
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
    
            // 5. 声明消息队列
            String queueName = "test_ack_queue";
            channel.queueDeclare(queueName, true, false, false, null);
    
            // 6. 绑定队列和Exchange
            channel.queueBind(queueName, exchangeName, routingKey);
    
    
            // 7. 设置消费者为自定义的消费者, 手工签收ACK,则必须 autoAck=false
            channel.basicConsume(queueName, false, new MyConsumer(channel));
    
        }
    
    }
    
    
    
    

    自定义消费

     package com.example.rabbitmqapi.ack;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    
    /**
     * 自定义消费者
     *
     * @author weiximei
     */
    public class MyConsumer extends DefaultConsumer {
    
        private Channel channel;
    
        public MyConsumer(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        // 第一参数:consumerTag 同一个会话, consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1,可以做此消息处理通道的名字。
        //      因此 envelope.deliveryTag 可以用来回传告诉 rabbitmq 这个消息处理成功 清除此消息(basicAck方法)。
        // 第二个参数: envelope 表示消息类型信息
        // 第三个参数:properties 表示消息路由头的其他属性等
        // 第三个参数: body 消息内容
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("-------------自定义消费者------------");
            System.out.println("consumerTag : " + consumerTag);
            System.out.println("envelope : " + envelope);
            System.out.println("properties : " + properties);
            System.out.println("body : " + new String(body));
    
            // 获取消息的额外参数
            Integer num = (Integer) properties.getHeaders().get("num");
            // 比如我让这个的额外参数等于 0 的时候,就进行重回队列
            if (num == 0) {
                // deliveryTag 参数是消息唯一标识
                // multiple 是否是批量的,一般为false
                // requeue 是否重回队列, true 是,false 不是
                channel.basicNack(envelope.getDeliveryTag(), false, true);
            } else {
    
                /**
                 * 手工签收, 第二个参数表示是否批量签收
                 */
                // envelope.getDeliveryTag() 是消息的唯一标识
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
    
        }
    }
    
    
    

    相关文章

      网友评论

          本文标题:消费端ACK与重回队列

          本文链接:https://www.haomeiwen.com/subject/yjcokdtx.html