美文网首页
Spring Boot整合RabbitMQ

Spring Boot整合RabbitMQ

作者: GIT提交不上 | 来源:发表于2020-05-08 20:31 被阅读0次

一、RabbitMQ简介

  RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。

百度百科-RabbitMQ

图1-1 RabbitMQ主要组件.png 图1-2 RabbitMQ主要组件.png

  相关组件介绍请参考:

SpringBoot与消息(RabbitMQ)
RabbitMQ

  Exchange分发消息时,共四种类型:direct、fanout、topic、headers,相关区别请参考:

RabbitMQ(一):RabbitMQ快速入门

二、Docker安装RabbitMQ

  本文使用阿里云的主机进行演示,使用docker安装RabbitMQ。(安全组开放5672和15672端口)

图2-1 安全组端口配置.png

下载RabbitMQ镜像
docker pull rabbitmq:3-management

启动容器,设置用户名、密码(yourusername /yourpassword),不设置默认为guest
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=yourusername -e RABBITMQ_DEFAULT_PASS=yourpassword -p 15672:15672 -p 5672:5672 rabbitmq:3-management

进入容器
docker exec -it rabbit bash
开启插件
rabbitmq-plugins enable rabbitmq_management

  至此就能在本地通过IP+端口号访问RabbitMQ。

三、Spring Boot整合RabbitMQ

示例源码见:https://github.com/just-right/rabbitmq

3.1 注入Queue、Exchange、Binding

  手动注入队列、交换器和绑定组件。

@Bean
public Queue queue1() {
    return new Queue("queue1");
}
@Bean
public Queue queue2() {
    return new Queue("queue2");
}
@Bean
public DirectExchange directExchange() {
    return new DirectExchange("directExchange");
}
@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanoutExchange");
}
@Bean
public TopicExchange topicExchange() {
    return new TopicExchange("topicExchange");
}

//queue1和queue2绑定fanoutExchange
@Bean
public Binding bindingExchangeAndQueue1(Queue queue1, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(queue1).to(fanoutExchange);
}
@Bean
public Binding bindingExchangeAndQueue2(Queue queue2, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(queue2).to(fanoutExchange);
}

//queue1和queue2绑定topicExchange
@Bean
public Binding bindingExchangeAndQueue3(@Qualifier("queue1") Queue queue1, TopicExchange topicExchange) {
    return BindingBuilder.bind(queue1).to(topicExchange).with("topic.match");
}
//#匹配0个或多个 *匹配一个
@Bean
public Binding bindingExchangeAndQueue4(@Qualifier("queue2") Queue queue2, TopicExchange topicExchange) {
    return BindingBuilder.bind(queue2).to(topicExchange).with("topic.#");
}

  自定义序列化:

//自定义序列化
@Bean
public MessageConverter getMessageConverter() {
    return new Jackson2JsonMessageConverter();
}

  发送与接收消息:发送消息调用RabbitTemplate的消息发送方法,结合@RabbitListener注解接收消息。

3.2 AmqpAdmin使用

  使用AmqpAdmin的declare相关方法可以创建队列、交换器和绑定组件,其他操作请查询对应的API。

@Resource
private AmqpAdmin amqpAdmin;

amqpAdmin.declareQueue(new Queue(queueName));
amqpAdmin.declareExchange(exchange);
amqpAdmin.declareBinding(new Binding(queueName,Binding.DestinationType.QUEUE, fanoutExchangeName, null, null));

RabbitMQ整合Springboot

四、消息确认机制(ACK)

  yaml添加失败返回、发送确认等配置:

spring:
  rabbitmq:
    publisher-returns: true #失败返回
    publisher-confirms: true #发送确认
    listener:
      simple:
        acknowledge-mode: manual #手动应答
        retry:
          enabled: true #重试
          max-attempts: 3 #重发次数--无效
      direct:
        acknowledge-mode: manual

  注入RabbitTemplate,配置ReturnCallback和ConfirmCallback方法。

@Bean
public RabbitTemplate getRabbitTemplate(ConnectionFactory connectionFactor) {
    Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactor);
    // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
    rabbitTemplate.setMandatory(true);

    // 消息返回, yml需要配置 publisher-returns: true
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        String correlationId = message.getMessageProperties().getCorrelationId();
        log.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
    });

    // 消息确认, yml需要配置 publisher-confirms: true
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (ack) {
            log.info("消息发送到exchange成功");
        } else {
            log.info("消息发送到exchange失败,原因: {}", cause);
        }
    });
    return rabbitTemplate;
}

  配置消息接受者,调用basicAck方法手动确认,如果发生异常根据情况判断重发还是丢弃消息。

@RabbitListener(queues = "queue1")
public void getMsg(String msg,Message message, Channel channel) throws IOException {
    try {
       int a = 1 / 0;
       //false只确认当前一个消息收到,true确认所有consumer获得的消息
       channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
       //ack返回false,并重新回到队列--注意次数限制 -- 最后一个参数为false则摒弃
       logger.info("消息接收失败,重发!");
       channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
       //拒绝消息
       //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }
}

springboot整合rabbitmq实现消息确认机制
SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门
rabbitmq监控之消息确认ack

五、延迟队列 & 死信队列

  使用延迟队列和死信队列完成消息延迟发送和消息重复发送需求。

5.1 消息延迟发送

  消息的TTL:消息的存活时间(Time To Live),当消息在队列中存在的时间超过这个设定值之后,系统会认为这个消息死了,故称为"死信"。
  死信Exchange:死信最后被转发到的路由地址。(和普通Exchange相同)

图5-1 死信Exchange流程.png

  配置延迟交换器、延迟队列、延时绑定。核心参数如下:

x-dead-letter-exchange //死信交换器
x-dead-letter-routing-key //死信路由键值
x-message-ttl //过期时间

//延迟队列
@Bean
public Queue delayQueue() {
    return QueueBuilder.durable(IRabbitMQConst.DELAY_QUEUE)
    // DLX,dead letter发送到的exchange ,设置死信队列交换器到处理交换器
    .withArgument(IRabbitMQConst.DEAD_LETTER_EXCHANGE_ARGUMENT, IRabbitMQConst.DEAD_LETTER_EXCHANGE)
    // dead letter携带的routing key,配置处理队列的路由key
     .withArgument(IRabbitMQConst.DEAD_LETTER_ROUTEKEY_ARGUMENT,IRabbitMQConst.DEAD_LETTER_ROUTEKEY)
    // 设置过期时间
     .withArgument(IRabbitMQConst.MESSAGE_TTL_ARGUMENT, IRabbitMQConst.EXPIRE_TIME)
     .build();
}
//延迟交换器-主题
@Bean
public TopicExchange delayExchange() {
    return new TopicExchange(IRabbitMQConst.DELAY_EXCHANGE);
}
//延迟绑定
@Bean
public Binding delayBinding(Queue delayQueue, TopicExchange delayExchange) {
    return BindingBuilder.bind(delayQueue).to(delayExchange).with(IRabbitMQConst.DELAY_ROUTEKEY);
}

  配置死信交换器、死信队列、死信绑定。

//死信队列
@Bean
public Queue deadLetterQueue(){
    return QueueBuilder.durable(IRabbitMQConst.DEAD_LETTER_QUEUE).build();
}
//死信交换器-主题
@Bean
public TopicExchange deadLetterExchange(){
    return new TopicExchange(IRabbitMQConst.DEAD_LETTER_EXCHANGE);
}
//死信绑定
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange) {
    return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(IRabbitMQConst.DEAD_LETTER_ROUTEKEY);
}

  延迟发送消息测试,消息和队列都可以设置过期时间,取两者最小值

@GetMapping(value = "delay")
public void delayTest() {
    System.out.println("消息已发出:" + LocalDateTime.now().toString());
    rabbitTemplate.convertAndSend(IRabbitMQConst.DELAY_EXCHANGE, IRabbitMQConst.DELAY_ROUTEKEY, "hello world!", message -> {
    // 设置延迟毫秒值 -- 已消息和队列设置的最小值为准
    message.getMessageProperties().setExpiration(String.valueOf(20000L));
    return message;
  });
}

  由于队列的先进先出特性,只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。所以当我们设置了一个较短的消息超时时间,但是因为它之前有队列尚未完结,故此消息不会进入死信。

5.2 消息重发

  消息重发流程如下所示,包括工作队列、重发队列和失败队列。在重发队列中设置延迟发送时间,并将其死信队列和私信交换器绑定为工作队列和工作交换器,当重试次数超过设置的次数,将消息发送到失败队列等待特定消费者处理或者人工处理。

图5-2 消息重发流程图.png

  按照图5-2的流程图分别设置工作队列、重发队列和失败队列以及其关联的交换器和绑定组件。

  工作队列、工作交换器以及工作绑定组件如下所示:

//工作队列
@Bean
public Queue workQueue() {
    return new Queue("workQueue");
}
//工作交换器
@Bean
public TopicExchange workExchange() {
    return new TopicExchange("workExchange");
}
//工作绑定
@Bean
public Binding topicQueueBinding(Queue workQueue, TopicExchange workExchange) {
    return BindingBuilder.bind(workQueue).to(workExchange).with("work.topic.*");
}

  重发队列、重发交换器以及重发绑定组件如下所示:

//重发队列
@Bean
public Queue retryQueue() {
    return QueueBuilder.durable("retryQueue")
        // DLX,dead letter发送到的exchange ,设置死信队列交换器到处理交换器
        .withArgument(IRabbitMQConst.DEAD_LETTER_EXCHANGE_ARGUMENT, "workExchange")
        // dead letter携带的routing key,配置处理队列的路由key
        .withArgument(IRabbitMQConst.DEAD_LETTER_ROUTEKEY_ARGUMENT,"work.topic.retry")
        // 设置过期时间
        .withArgument(IRabbitMQConst.MESSAGE_TTL_ARGUMENT, 3000L)
        .build();
}
//重发交换器
@Bean
public TopicExchange retryExchange() {
    return ExchangeBuilder.topicExchange("retryExchange").durable(true).build();
}
//重发绑定
@Bean
public Binding retryDirectBinding(@Qualifier("retryQueue") Queue retryQueue,
                                      @Qualifier("retryExchange") TopicExchange retryExchange) {
    return BindingBuilder.bind(retryQueue).to(retryExchange).with("work.retry.*");
}

  失败队列、失败交换器以及失败绑定组件如下所示:

//失败队列
@Bean
public Queue failQueue() {
    return QueueBuilder.durable("failQueue").build();
}
//失败交换器
@Bean
public TopicExchange failExchange() {
    return ExchangeBuilder.topicExchange("failExchange").durable(true).build();
}
//失败绑定
@Bean
public Binding failDirectBinding(@Qualifier("failQueue") Queue failQueue,@Qualifier("failExchange") TopicExchange failExchange) {
    return BindingBuilder.bind(failQueue).to(failExchange).with("work.fail.*");
}

  消息接收端接收消息,发送异常时将消息推送到重发队列,重发超过3次将消息推送到失败队列。

@RabbitListener(queues = "workQueue")
public void getMsg3(Message message, Channel channel) throws IOException {
    System.out.println("消息已收到:"+LocalDateTime.now().toString());
    logger.info("收到消息");
    try {
        //模拟异常
        int a = 1/0;
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        long retryCount = getRetryCount(message.getMessageProperties());
        if(retryCount >= 3){
            System.out.println("重发超过三次,发送到失败队列!");
            rabbitTemplate.convertAndSend("failExchange","work.fail.queue",message);
        }else{
            System.out.println("第" + (retryCount+1) + "次重发!");
             rabbitTemplate.convertAndSend("retryExchange","work.retry.queue",message);
        }
    }finally {
        //避免死循环  
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

  消息重发测试:

@GetMapping(value = "retry")
public void retryTest() {
    rabbitTemplate.convertAndSend("workExchange", "work.topic.queue", "hello world!");
}

Spring Boot 整合——RabbitMQ配置延时队列和消息重试
RabbitMQ实现消息的消费确认,死信队列、延迟重试队列

六、消息幂等性

  幂等性:任意多次执行所产生的影响均与一次执行的影响相同。
  消息幂等性实现思路如下所示,客户端实现思路类似,不再赘述,具体实现细节请查看源码。

https://github.com/just-right/rabbitmq

图6-1 消息幂等性实现思路.png

  配置MQ时如果配置了json解析器,则程序会走自动确认消费,配置文件的配置不生效。手动添加RabbitListenerContainerFactory的配置,设置手动确认。

@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
}

RabbitMQ消息幂等性问题
RabbitMQ消息可靠性投递与消费,消费幂等
springBoot-rabbit MQ-设置手动确认ACK-Channel shutdown异常

七、综合测试

  使用PostMan进行测试:

图7-1 请求测试.png

  测试结果如下所示:

图7-2 测试结果.png

相关文章

网友评论

      本文标题:Spring Boot整合RabbitMQ

      本文链接:https://www.haomeiwen.com/subject/nvsqnhtx.html