美文网首页
Spring Boot RabbitMQ消息收发

Spring Boot RabbitMQ消息收发

作者: peteLee | 来源:发表于2018-03-19 09:25 被阅读0次

    组件

    RabbitMQ是AMQP协议的实现。它主要包括以下组件:

    • Server: RabbitMQ实例
    • Vertual Host: 是一个虚拟概念,用于权限控制,一个Virtual Host里面可以有若干个Exchange和Queue,不同Vertual Host不可见。
    • Queue:消息队列,用于存储还未被消费者消费的消息。
    • Exchange:接受生产者发送的消息,根据自身模式转法给不同的Queue。常用的有3种direct、Fanout和Topic。
    • Routing Key:发送方发送消息时,需要指定exchange与routing key,在direct、topic模式的exchange下结合binding key决定消息路由到哪个Queue中。
    • Binding Key:消息接收方创建Queue与Exchange的绑定规则时指定,就是Routing Key的正则,代表条件的Routing Key会被转发到本队列。

    例子

    这里举一个Topic模式的例子。

    • 发送方配置
    spring.application.name=spring-boot-rabbitmq
    spring.rabbitmq.host=10.20.2.240
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.publisher-confirms=false
    spring.rabbitmq.virtual-host=/
    
    • 发送方配置类:
    @Configuration
    public class RabbitConfig {
      @Bean
      public TopicExchange topicExCertification() {
        return new TopicExchange("certification_exchange");
      }
    }
    
    • 发送类(不要在意Msg类,是我自定义的,测试直接扔个字符串过去就行)
    @Service
    public class Sender {
    
      @Autowired
      private AmqpTemplate rabbitTemplate;
    
      public void topicSendMessage(final String data) {
        final String context = " -- from RoutingKey : topic.message";
        final Msg msg = Msg.builder().data(data + context).sendTime(new DateTime()).build();
        System.out.println("Sender : " + msg.toString());
        // exchangeName, routingKey, msg
        rabbitTemplate.convertAndSend(Constant.TOPIC_EX, "topic.message", msg);
      }
    }
    
    • 接收方配置
    spring.application.name=spring-boot-rabbitmq
    spring.rabbitmq.host=10.20.2.240
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.listener.simple.acknowledge-mode=none
    spring.rabbitmq.virtual-host=/
    
    • 接收方配置类
    @Configuration
    public class RabbitConfig {
    
      @Bean
      public TopicExchange topicExchange() {
        return new TopicExchange(Constant.TOPIC_EX);
      }
      
      // topic 方式
      @Bean
      public Queue queueMessageAll() {
        return new Queue(Constant.Q_TOPIC_MESSAGE_ALL);
      }
    
      /**
       * binding_key=topic.# 模糊匹配routing_key为topic.xxx发过来的消息加放队列
       */
      @Bean
      public Binding bindingExchangeMessageAll(final Queue queueMessageAll, final TopicExchange topicExchange) {
        return BindingBuilder.bind(queueMessageAll).to(topicExchange).with("topic.#");
      }
    }
    
    • 接收方监听队列类
    @Component
    public class Receiver {
    
      @RabbitListener(queues = Constant.Q_TOPIC_MESSAGE_ALL)
      @RabbitHandler
      public void topicReceiver2(final Msg msg) {
        System.out.println("Topic Message All Receiver:" + msg.toString());
      }
    }
    

    网上看到一些例子,发送方和接收方都写在一起,让人看起来很懵,
    虽然 Exchange,Queue,Binding,都可以在一端建,不过本着设计原则,这里建议:

    1. 发送方只关注往哪个Exchange 用什么规则(Routing key)发,因为你如果对外服务,只需要发布这个事件,而不需要关注谁要消费
    2. 接收方需要定义对列,并将对列以某规则(Binding key)绑定到Exchange上,所以你要知道Exchange、Queue、Binding key。

    接收方启用ACK如何回复

    直接看代码

    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    
      @RabbitListener(queues = Constant.Q_TOPIC_MESSAGE_ALL)
      @RabbitHandler
      public void topicReceiver2(final Message message, final Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        try {
          // Queue中的数据在message.getBody()里
        } catch (Exception ex) {
    
        } finally {
          channel.basicAck(deliveryTag, false);
        }
    }
    

    具体demo:参考git@github.com:spadekingdom/demo-mq.git

    相关文章

      网友评论

          本文标题:Spring Boot RabbitMQ消息收发

          本文链接:https://www.haomeiwen.com/subject/uwohqftx.html