美文网首页RabbitMQ学习RabbitMQ
RabbitMQ笔记十六:消息确认之二(Consumer Ack

RabbitMQ笔记十六:消息确认之二(Consumer Ack

作者: 二月_春风 | 来源:发表于2017-10-22 23:44 被阅读180次

    消费确认(comsumer acknowledgements)

    broker与消费者之间的消息确认称为comsumer acknowledgements,comsumer acknowledgements机制用于解决消费者与Rabbitmq服务器之间消息可靠传输,它是在消费端消费成功之后通知broker消费端消费消息成功从而broker删除这个消息。

    RabbitMQ Java Client 实现消息确认

    自动确认

    zhihao.miao.order队列中发送一条消息

    web管控台查看
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.util.concurrent.TimeUnit;
    
    public class Consumer {
        public static void main(String[] args) throws Exception{
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
    
            Connection connection = connectionFactory.newConnection();
    
            Channel channel = connection.createChannel();
    
            /**
             * basicConsume方法的第二个参数是boolean类型,true表示消息一旦投递出去就自动确认,而false表示需要自己手动去确认
             * 自动确认有丢消息的可能,因为如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息
             * 设置了false,表示需要人为手动的去确定消息,只有消费者将消息消费成功之后给与broker人为确定才进行消息确认
             * 这边也有个问题就是如果由于程序员自己的代码的原因造成人为的抛出异常,人工确认那么消息就会一直重新入队列,一直重发?
             */
    
            String consumerTag = channel.basicConsume("zhihao.miao.order",true,new SimpleConsumer(channel));
            System.out.println(consumerTag);
    
            TimeUnit.SECONDS.sleep(30);
    
            channel.close();
            connection.close();
        }
    }
    

    消费具体逻辑

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    
    public class SimpleConsumer extends DefaultConsumer {
    
        public SimpleConsumer(Channel channel){
            super(channel);
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(consumerTag);
            System.out.println("-----收到消息了---------------");
            System.out.println("消息属性为:"+properties);
            System.out.println("消息内容为:"+new String(body));
            try
            {
                int i = 1/0;
                System.out.println(i);
            }catch (Exception e){
                e.printStackTrace();
            }
    
        }
    }
    

    控制台打印:

    amq.ctag-6_GQmh1tMeooWSiuqUmz0Q
    java.lang.ArithmeticException: / by zero
    -----收到消息了---------------
        at com.zhihao.test.day04.SimpleConsumer.handleDelivery(SimpleConsumer.java:29)
    amq.ctag-6_GQmh1tMeooWSiuqUmz0Q
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
    消息属性为:#contentHeader<basic>(content-type=json, content-encoding=null, headers={}, delivery-mode=2, priority=0, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    消息内容为:{"orderId":"abba05db-050e-4b1a-97f1-c469b23ca27b","createTime":"2017-10-22T21:02:41.861","price":100.0}
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    

    此时可以看到消费端抛出了异常,但是我们发现这条消息也已经消费掉了,此时如果消费端消费逻辑使用spring进行管理的话消费端业务逻辑会进行回滚,这也就造成了实际意义的消息丢失。

    web管控台

    手动确认

    自动确认会造成实际意义上的消息丢失。

    将basicConsume方法的第二个参数改为false,表示人工的进行消息确认,如果消费者正在监听队列,那么此时消息进入Unacked,而如果消费者停掉服务,那么消息的状态又变成Ready了。这个机制表明了消息必须是ack确认之后才会在server中删除掉。

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.util.concurrent.TimeUnit;
    
    public class Consumer {
        public static void main(String[] args) throws Exception{
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
    
            Connection connection = connectionFactory.newConnection();
    
            Channel channel = connection.createChannel();
    
            //手动确认
            String consumerTag = channel.basicConsume("zhihao.miao.order",false,new SimpleConsumer(channel));
            System.out.println(consumerTag);
    
            TimeUnit.SECONDS.sleep(30);
    
            channel.close();
            connection.close();
        }
    }
    

    消费具体逻辑

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    
    public class SimpleConsumer extends DefaultConsumer {
    
        public SimpleConsumer(Channel channel){
            super(channel);
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
            System.out.println(consumerTag);
            System.out.println("-----收到消息了--------------");
    
            System.out.println("消息属性为:"+properties);
            System.out.println("消息内容为:"+new String(body));
    
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.getChannel().basicAck(envelope.getDeliveryTag(),false);
            System.out.println("消息消费成功");
        }
    }
    
    此时消息已经发送给消费者,但是消费者还没有进行手动确认

    发送一个header中包含error属性的消息,

    发送一个header中包含error属性的消息

    改造消费逻辑

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    
    
    public class SimpleConsumer extends DefaultConsumer {
    
        public SimpleConsumer(Channel channel){
            super(channel);
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
            System.out.println(consumerTag);
            System.out.println("-----收到消息了--------------");
    
            if(properties.getHeaders().get("error") != null){
                try {
                    TimeUnit.SECONDS.sleep(15);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
              
                System.out.println("nack");
                this.getChannel().basicNack(envelope.getDeliveryTag(),false,true);
    
                return;
            }
            System.out.println("消息属性为:"+properties);
            System.out.println("消息内容为:"+new String(body));
    
    
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.getChannel().basicAck(envelope.getDeliveryTag(),false);
            System.out.println("消息消费成功");
        }
    }
    

    控制台打印,说明该消息一直重新入队列然后一直重新消费

    amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
    amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
    -----收到消息了--------------
    nack
    amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
    -----收到消息了--------------
    nack
    amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
    -----收到消息了--------------
    nack
    amq.ctag-U5cHBcnxa5dhkYXjd1LFgQ
    -----收到消息了--------------
    nack
    

    消费端也可以拒绝消息,

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    
    public class SimpleConsumer extends DefaultConsumer {
    
        public SimpleConsumer(Channel channel){
            super(channel);
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException{
            System.out.println(consumerTag);
            System.out.println("-----收到消息了--------------");
    
            if(properties.getHeaders().get("error") != null){
                try {
                    TimeUnit.SECONDS.sleep(15);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
               
                //这个api也支持拒绝消息消费,第二个参数表示是否重新入队列
                this.getChannel().basicReject(envelope.getDeliveryTag(),false);
                System.out.println("消息无法消费,拒绝消息");
                return;
            }
            System.out.println("消息属性为:"+properties);
            System.out.println("消息内容为:"+new String(body));
    
    
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.getChannel().basicAck(envelope.getDeliveryTag(),false);
            System.out.println("消息消费成功");
        }
    }
    

    控制台打印,因为设置了不重新入队列,所以不再重新发消息了:

    amq.ctag-kiy_49AkC3f4qRkqCMujrw
    amq.ctag-kiy_49AkC3f4qRkqCMujrw
    -----收到消息了--------------
    消息无法消费,拒绝消息
    

    总结
    消费端的消息确认分为二个步骤,

    • 在channel.basicConsume指定为手动确认。
    • 具体根据业务逻辑来进行判断什么是ack什么时候nack(又分为要不要重新requeue)

    这边有个问题就是nack时候或者reject时候重新入队列如果业务端因为代码逻辑问题一直重发怎样去设置一个次数值?
    我的设想就是设置一个重新发送的递增值,这个值与消息id对应,去处理解决它。或者在redis或者memcache等其他保存方式然后记录这个重发次数。
    How do I set a number of retry attempts in RabbitMQ?

    Spring AMQP消费端实现消息确认

    自动确认

    配置类

    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.concurrent.TimeUnit;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames("zhihao.miao.order");
            /**
             * 自动确认涉及到一个问题就是如果在消息消息的时候抛出异常,消息处理失败,但是因为自动确认而server将该消息删除了。
             * NONE表示自动确认
             */
            container.setAcknowledgeMode(AcknowledgeMode.NONE);
            container.setMessageListener((MessageListener) message -> {
                System.out.println("====接收到消息=====");
                System.out.println(new String(message.getBody()));
    
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //相当于自己的一些消费逻辑抛错误
                throw new NullPointerException("consumer fail");
    
            });
            return container;
        }
    }
    

    应用启动类

    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
    
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
    
            TimeUnit.SECONDS.sleep(100);
            System.out.println("message container startup");
    
            context.close();
        }
    }
    

    控制台打印:

    ====接收到消息=====
    {"orderId":"d232eea5-35ae-4534-80f4-cfb31f49178f","createTime":"2017-10-22T22:11:34.239","price":100.0}
    十月 22, 2017 10:11:58 下午 org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler handleError
    警告: Execution of Rabbit message listener failed.
    org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
    

    Web控制台上显示消息消费确认也成功。问题还是自动确认会造成事实上的消息丢失。

    手动确认

    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.concurrent.TimeUnit;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames("zhihao.miao.order");
    
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
                System.out.println("====接收到消息=====");
                System.out.println(new String(message.getBody()));
                TimeUnit.SECONDS.sleep(10);
                if(message.getMessageProperties().getHeaders().get("error") == null){
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                    System.out.println("消息已经确认");
                }else {
                    //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                    System.out.println("消息拒绝");
                }
    
            });
            return container;
        }
    }
    

    总结

    AcknowledgeMode.NONE:自动确认,等效于autoAck=true
    AcknowledgeMode.MANUAL:手动确认,等效于autoAck=false,此时如果要实现ack和nack回执的话,使用ChannelAwareMessageListener监听器处理。

    AcknowledgeMode.AUTO的使用

    我们发现AcknowledgeMode除了AcknowledgeMode.NONEAcknowledgeMode.MANUAL常量值之外还有一个AcknowledgeMode.AUTO的常量。

    配置类

    import org.springframework.amqp.AmqpRejectAndDontRequeueException;
    import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MQConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            return rabbitAdmin;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            return rabbitTemplate;
        }
    
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames("zhihao.miao.order");
            container.setAcknowledgeMode(AcknowledgeMode.AUTO);
            container.setMessageListener((MessageListener) (message) -> {
                System.out.println("====接收到消息=====");
                System.out.println(new String(message.getBody()));
                //抛出NullPointerException异常则重新入队列
                //throw new NullPointerException("消息消费失败");
                //当抛出的异常是AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false
                //throw new AmqpRejectAndDontRequeueException("消息消费失败");
                //当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认
                throw new ImmediateAcknowledgeAmqpException("消息消费失败");
    
            });
            return container;
        }
    }
    

    应用启动类

    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @ComponentScan
    public class Application {
    
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
    
            TimeUnit.SECONDS.sleep(100);
            System.out.println("message container startup");
    
            context.close();
        }
    }
    

    AcknowledgeMode.AUTO 根据方法的执行情况来决定是否确认还是拒绝(是否重新入queue)

    • 如果消息成功被消费(成功的意思就是在消费的过程中没有抛出异常),则自动确认。

    1)当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue=false(不重新入队列)
    2)当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认
    3)其他的异常,则消息会被拒绝,且requeue=true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过setDefaultRequeueRejected(默认是true)去设置,

    源码分析

    org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainerdoReceiveAndExecute方法,

    private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR
    
        Channel channel = consumer.getChannel();
    
        for (int i = 0; i < this.txSize; i++) {
    
            logger.trace("Waiting for message from consumer.");
            Message message = consumer.nextMessage(this.receiveTimeout);
            if (message == null) {
                break;
            }
            try {
               //具体的逻辑,具体执行Listener
                executeListener(channel, message);
            }
            //当ImmediateAcknowledgeAmqpException异常的时候打印日志然后直接break
            catch (ImmediateAcknowledgeAmqpException e) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("User requested ack for failed delivery: "
                            + message.getMessageProperties().getDeliveryTag());
                }
                break;
            }
            catch (Throwable ex) { //NOSONAR
                if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("User requested ack for failed delivery: "
                                + message.getMessageProperties().getDeliveryTag());
                    }
                    break;
                }
                if (this.transactionManager != null) {
                    if (this.transactionAttribute.rollbackOn(ex)) {
                        RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
                                .getResource(getConnectionFactory());
                        if (resourceHolder != null) {
                            consumer.clearDeliveryTags();
                        }
                        else {
                            /*
                             * If we don't actually have a transaction, we have to roll back
                             * manually. See prepareHolderForRollback().
                             */
                            consumer.rollbackOnExceptionIfNecessary(ex);
                        }
                        throw ex; // encompassing transaction will handle the rollback.
                    }
                    else {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("No rollback for " + ex);
                        }
                        break;
                    }
                }
                else {
                   //进入这边
                    consumer.rollbackOnExceptionIfNecessary(ex);
                    throw ex;
                }
            }
        }
    
        return consumer.commitIfNecessary(isChannelLocallyTransacted(channel));
    
    }
    

    进入rollbackOnExceptionIfNecessary方法

    public void rollbackOnExceptionIfNecessary(Throwable ex) throws Exception {
    
      //当ack机制为AUTO的时候
        boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual();
        try {
            if (this.transactional) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Initiating transaction rollback on application exception: " + ex);
                }
                RabbitUtils.rollbackIfNecessary(this.channel);
            }
            if (ackRequired) {
               //是否入队列,shouldRequeue就是具体的入队列和不入队列的判断
                boolean shouldRequeue = RabbitUtils.shouldRequeue(this.defaultRequeuRejected, ex, logger);
                for (Long deliveryTag : this.deliveryTags) {
                    // With newer RabbitMQ brokers could use basicNack here...
                    //执行拒绝策略
                    this.channel.basicReject(deliveryTag, shouldRequeue);
                }
                if (this.transactional) {
                    // Need to commit the reject (=nack)
                    RabbitUtils.commitIfNecessary(this.channel);
                }
            }
        }
        catch (Exception e) {
            logger.error("Application exception overridden by rollback exception", ex);
            throw e;
        }
        finally {
            this.deliveryTags.clear();
        }
    }
    

    是否入队列的判断(shouldRequeue

    public static boolean shouldRequeue(boolean defaultRequeueRejected, Throwable throwable, Log logger) {
        boolean shouldRequeue = defaultRequeueRejected ||
                throwable instanceof MessageRejectedWhileStoppingException;
        Throwable t = throwable;
        while (shouldRequeue && t != null) {
           //如果抛出的异常是AmqpRejectAndDontRequeueException的时候,不入队列
            if (t instanceof AmqpRejectAndDontRequeueException) {
                shouldRequeue = false;
            }
            t = t.getCause();
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Rejecting messages (requeue=" + shouldRequeue + ")");
        }
        return shouldRequeue;
    }
    

    container.setDefaultRequeueRejected(false);,那么消息就不会重新入队列,只会拒绝一次。

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("zhihao.miao.order");
        //自动确认涉及到一个问题就是如果在消息消息的时候抛出异常,消息处理失败,但是因为自动确认而server将该消息删除了。
        //NONE表示自动确认
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setDefaultRequeueRejected(false);
        container.setMessageListener((MessageListener) (message) -> {
            System.out.println("====接收到消息=====");
            System.out.println(new String(message.getBody()));
            throw new NullPointerException("消息消费失败");
            //throw new AmqpRejectAndDontRequeueException("消息消费失败");
            //throw new ImmediateAcknowledgeAmqpException("消息消费失败");
    
        });
        return container;
    }
    

    使用@RabbitListener注解监听队列

    设置确认模式是通过在容器中设置RabbitListenerContainerFactory实例的setAcknowledgeMode方法来设定。

    配置:

    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class ConsumerConfig {
    
        @Bean
        public ConnectionFactory connectionFactory(){
            CachingConnectionFactory factory = new CachingConnectionFactory();
            factory.setUri("amqp://zhihao.miao:123456@192.168.1.131:5672");
            return factory;
        }
    
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            //默认的确认模式是AcknowledgeMode.AUTO
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            return factory;
        }
    }
    

    处理器:

    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    @Component
    public class MessageHandler {
    
        @RabbitListener(queues ="zhihao.miao.order")
        public void handleMessage(byte[] bytes, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
            System.out.println("====消费消息===handleMessage");
            System.out.println(new String(bytes));
            channel.basicAck(tag,false);
        }
    }
    

    应用启动类:

    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.context.annotation.AnnotationConfigApplicationContext;
    import org.springframework.context.annotation.ComponentScan;
    
    import java.util.concurrent.TimeUnit;
    
    @EnableRabbit
    @ComponentScan
    public class Application {
        public static void main(String[] args) throws Exception{
            AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Application.class);
            System.out.println("rabbit service startup");
            TimeUnit.SECONDS.sleep(3000);
            context.close();
        }
    }
    

    可靠消息总结

    实际使用mq的实例,每段时间定期的给经常订早餐的推送短信(上新品)。
    登录短信(也是使用消息中间件)
    下单的时候,使用消息中间件发送到配送系统(消息不能丢失)。

    做到消息不能丢失,我们就要实现可靠消息,做到这一点,我们要做到下面二点:

    一:持久化
    1: exchange要持久化
    2: queue要持久化
    3: message要持久化
    二:消息确认
    1: 启动消费返回(@ReturnList注解,生产者就可以知道哪些消息没有发出去)
    2:生产者和Server(broker)之间的消息确认。
    3: 消费者和Server(broker)之间的消息确认。

    对于重要的消息,要结合本地的消息表才能上生产。

    参考资料Consumer Acknowledgements and Publisher Confirms

    相关文章

      网友评论

      • 4097f80e6815:1: 启动消费返回(@ReturnList注解,生产者就可以知道哪些消息没有发出去)
        请问@ReturnList是什么注解???
        二月_春风:可以看我这篇博客,https://www.jianshu.com/p/b8416617e869

      本文标题:RabbitMQ笔记十六:消息确认之二(Consumer Ack

      本文链接:https://www.haomeiwen.com/subject/cdgwextx.html