美文网首页收藏RabbitMQ
RabbitMQ高级整合应用-5、RabbitMQ与Spring

RabbitMQ高级整合应用-5、RabbitMQ与Spring

作者: 那钱有着落吗 | 来源:发表于2021-12-27 15:03 被阅读0次

    这个帖子我们就可以使用springboot经典的配置文件的方式来实现消息的确认和失败回调


    image.png

    消息确认和失败回调 代码示例

    配置文件

    spring.application.name=rabbit-sample
    server.port=63001
    
    
    #rabbitMQ连接字符串
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.virtual-host = /
    spring.rabbitmq.connection-timeout=15000
    
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true
    

    消息发送类,我们在消息发送类中定义了消息的确认方法,消息的失败回调方法:

    @Component
    public class RabbitSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("correlationData:"+correlationData);
                System.out.println("ack:"+ack);
                if(!ack){
                    System.out.println("异常处理机制:-----");
                }
            }
        };
    
        final RabbitTemplate.ReturnsCallback returnsCallback = new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                System.out.println("return:"+returned);
            }
        };
    
        public void send(Object message, Map<String,Object> properties) throws Exception{
            MessageHeaders messageHeaders = new MessageHeaders(properties);
            Message<Object> msg = MessageBuilder.createMessage(message,messageHeaders);
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnsCallback(returnsCallback);
    
            // CorrelationData cd = new CorrelationData();
            // cd.setId("444555666");//全局唯一id,可以加时间戳
            rabbitTemplate.convertAndSend("exchange-boot","rabbit.a",msg);
        }
    
    }
    

    测试类:

    @Test
        public void send1() throws Exception{
            Map<String,Object> messageProperties = new HashMap<>();
            messageProperties.put("number",1234);
            messageProperties.put("sendTime",new Date());
            rabbitSender.send("this is a msg!",messageProperties);
        }
    

    消费端配置


    image.png

    消费端@RabbitListener注解使用

    在前面的基础上加上下面配置:

    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.concurrency = 5
    spring.rabbitmq.listener.simple.max-concurrency = 10
    

    创建一个消息接收类:

    
    @Component
    public class RabbitReceiver {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @RabbitListener(
                 bindings = @QueueBinding(
                         value = @Queue(value = "queue-boot",durable = "true"),
                         exchange = @Exchange(value="exchange-boot",durable = "true",
                                 type="topic",ignoreDeclarationExceptions ="true"),
                         key = "spring.*"
                 )
        )
        @RabbitHandler
        public void onMessage(Message message, Channel channel) throws Exception{
            System.out.println("-------");
            System.out.println("消费端payload:"+message.getPayload());
            Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            //手动ACK
            channel.basicAck(deliveryTag,false);
        }
    }
    

    可以看到我们使用注解的方式创建了一个exchange,一个队列,然后还建立了绑定关系,然后注解的方法就监听这个队列,接收消息并手动ack。

    使用配置文件配置exchange以及队列等信息

    直接在代码中写死是不灵活的方式,所以我们在配置文件写好,然后以变量的方式注入到rabbit的注解中以达到创建队列的方式。

    在接收端我们也可以直接接收java对象,这种方式更加便捷:

    相关文章

      网友评论

        本文标题:RabbitMQ高级整合应用-5、RabbitMQ与Spring

        本文链接:https://www.haomeiwen.com/subject/csqyqrtx.html