美文网首页javaWeb学习
Spring boot + RabbitMQ 之 消息确认

Spring boot + RabbitMQ 之 消息确认

作者: _大叔_ | 来源:发表于2019-07-22 09:15 被阅读0次

    RabbitMQ消息 确认分为两种:一是生产确认,二是消费确认
    RabbitMQ本身支持两种 确认方式:一是事务确认,二是ACK确认

    这里直接介绍Spring Boot+RabbitMQ 的消息确认(ACK)

    一:生产确认

    生产者确认需要在生产的地方实现 RabbitTemplate.ConfirmCallback

    @Service
    public class PersonalService implements RabbitTemplate.ConfirmCallback{
    
        @Autowired
        public UserInfoDao userInfoDao;
        @Autowired
        public MailTools mailTools;
        @Autowired
        public LoginService loginService;
        public RabbitTemplate rabbitTemplate;
        
        static Logger log = Logger.getLogger(PersonalService.class);
    
        /**
         * 需要通过生产者的构造器去注入RabbitTemplate,并设置他 回调确认对象为 当前对象。
         */
        public PersonalService(RabbitTemplate rabbitTemplate){
            this.rabbitTemplate=rabbitTemplate;
            this.rabbitTemplate.setConfirmCallback(this);
        }
    }
    

    实现了 ConfirmCallback 之后需要实现 confirm 方法

        /**
         * 消息发送到队列回调该方法
         * correlationData : 发送消息时给的id
         * cause : 会返回错误信息,正确为null
         * ack:是否正确发送
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            // TODO Auto-generated method stub
            System.out.println("confirm --> "+correlationData.getId()+" ->"+ack+" ->"+cause);
        }
    
    

    二:消费确认

    在Rabbitmq+Springboot中,消费者的实现方式为注解方式:

    @Component
    public class MessageReceiver {
    
        @RabbitListener(queues = AmqpConfiguration.QUEUE)
        //@RabbitHandler
        public void receive(String hello, Channel channel, Message message){
            // 限流处理:消息体大小不限制,每次限制消费一条,只作用于该Consumer层,不作用于Channel
            channel.basicQos(0, 1, false);//限制于消费级别
    
            String messsageText = new String(message.getBody());
            System.out.println("[receiver] receive message : "+ messsageText);
            try {
                if(validate(messsageText)){
                    System.out.println("[receiver] confirm");
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认消息接收
                }else{
                    System.out.println("[receiver] reject");
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);//拒绝消息接收
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private boolean validate(String messsageText) {
            return !(messsageText!=null && messsageText.indexOf("fuck")>-1);
        }
    }
    

    -> @RabbitListener:监听的队列
    -> @RabbitHandler:但@RabbitListener注解在类上时,需要使用@RabbitHandler来指明调用的方法。
    -> void basicAck(long deliveryTag, boolean multiple) throws IOException; 确认消息接收 deliveryTag:该消息的index,multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
    -> channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); 拒绝消息接收 deliveryTag:该消息的index,multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息,requeue:被拒绝的是否重新入队列。
    -> channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); 拒绝消息接收 deliveryTag:该消息的index,requeue:被拒绝的是否重新入队列,channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息。
    -> chanel.basicQos(int prefetchSize, int prefetchCount, boolean global) 消息限流的功能,防止生产过多,导致消费者消费吃力的情况;
    prefetchSize: 0表示对消息的大小无限制,单位为(B-字节)
    prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将 阻塞 掉,直到有消息ack。0为无上限
    global:true\false 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别。


    该图片来自RabbitMq实战.pdf

    相关文章

      网友评论

        本文标题:Spring boot + RabbitMQ 之 消息确认

        本文链接:https://www.haomeiwen.com/subject/citblctx.html