美文网首页
3 发送消息并设置Confirm和Return

3 发送消息并设置Confirm和Return

作者: Finlay_Li | 来源:发表于2020-07-15 20:20 被阅读0次

    Confirm

    消息消费后的回调,对消费结果进行确认

    Return

    当消息路由不可达时,触发回调

    配置

    #--------------------------------------------rabbitmq--------------------------------------------
    #连接工厂
    spring.rabbitmq.addresses=xxxx:5677,xxx:5678
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=qwg-rabbitmq@guest
    #虚拟机
    spring.rabbitmq.virtual-host=qwg-app-dev
    #------------生产端
    # 开启:消息确认
    spring.rabbitmq.publisherConfirms= true
    # 开启:路由不可达的消息返回
    spring.rabbitmq.publisher-returns= true
    # 设置true 监听器会收到:路由不可达的消息,从而可对路由不可达的消息进行处理,保证消息的路由成功;如果为false,那么Broker会自动删除该消息
    spring.rabbitmq.template.mandatory= true
    #------------消费端
    # 手工签收
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    

    生产端

    package com.finlay.scaffold.boot;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RabbitSender {
        private static final String EXCHANGE_NAME = "boot.exchange";
        private static final String ROUTING_KEY = "springboot.hello";
    //    private static final String ROUTING_KEY = "springboot.hexllo"; ----------当routing_key 或 exchange不存在时:触发ReturnCallback
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //confirm
        final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
    
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("*************消息发送成功**************");
                System.out.println("ack签收结果------------>:" + b);
                System.out.println("若发生异常,异常信息------------>:" + s);
            }
        };
    
        //return
        final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
    
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText,
                                        String exchange, String routingKey) {
                System.out.println("**************Return********************");
                System.out.println("replyCode:" + replyCode);
                System.out.println("replyText:" + replyText);
                System.out.println("exchange:" + exchange);
                System.out.println("routingKey:" + routingKey);
            }
        };
    
        public void sendString() {
            String msg = "rabbitmq--------->springboot--------->hello";
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, msg);
        }
    }
    
    

    消费端

    package com.finlay.scaffold.boot;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    Component
    public class RabbitReceiver {
    
        private static final String EXCHANGE_NAME = "boot.exchange";
    //    private static final String ROUTING_KEY = "springboot.#";
        private static final String ROUTING_KEY = "springboot.hello";
        private static final String QUEUE_NAME = "boot.queue";
    
        //直接通过@RabbitListener,完成QUEUE,EXCHANGE 的【声明、绑定】
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = QUEUE_NAME,
                        durable = "true"),
                exchange = @Exchange(value = EXCHANGE_NAME,
                        type = ExchangeTypes.TOPIC,
                        durable = "false"),
                key = ROUTING_KEY
        ))
        @RabbitHandler
        public void rec(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
            try {
                channel.basicQos(1);
                System.out.println("receiver: " + msg);
                channel.basicAck(tag, false);
            } catch (Exception e) {
                //不建议再放回队列,可以采用ConfirmCallback 机制进行处理
                throw new RuntimeException(e);
            }
        }
    }
    
    
    

    相关文章

      网友评论

          本文标题:3 发送消息并设置Confirm和Return

          本文链接:https://www.haomeiwen.com/subject/houjhktx.html