美文网首页
SpringBoot RabbitMQ 整合

SpringBoot RabbitMQ 整合

作者: 赛亚人之神 | 来源:发表于2018-10-18 20:45 被阅读16次

pom.xml 中引入依赖


      <!-- Spring Boot QMQP Starter-->
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>${springboot.version}</version>
      </dependency>
  1. 声明 ConnectionFactory
/**
   * 声明连接工厂
   * https://docs.spring.io/spring-amqp/docs/2.1.0.RELEASE/reference/html/_reference.html#_rabbitconnectionfactorybean_and_configuring_ssl
   * @return
   */
  @Bean(name = CONNECTION_FACTORY)
  public ConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setAddresses(rabbitProperties.getAddresses());
    connectionFactory.setUsername(rabbitProperties.getUsername());
    connectionFactory.setPassword(rabbitProperties.getPassword());
    connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());
    connectionFactory.setPublisherConfirms(true);
    // 保证消息的事务性处理rabbitmq默认的处理方式为auto
    // ack,这意味着当你从消息队列取出一个消息时,ack自动发送,mq就会将消息删除。而为了保证消息的正确处理,我们需要将消息处理修改为手动确认的方式

    return connectionFactory;
  }
  1. 声明 RabbitAdmin 配置 ExchangeQueueBinding
/**
   * 自动生成交换器,队列,队列和交换器的绑定关系
   * 声明多个 Exchanges, Queues, Bindings
   * https://docs.spring.io/spring-amqp/docs/2.1.0.RELEASE/reference/html/_reference.html#collection-declaration
   * @return
   */
  @Bean
  public RabbitAdmin rabbitAdmin() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(defaultConnectionFactory());
    // 声明交换器

    Exchange exchange = ExchangeBuilder.topicExchange(rabbitProperties.getExchangeName()).durable(true).build();
    rabbitAdmin.declareExchange(exchange);

    // 声明队列
    Queue queue = QueueBuilder.durable(rabbitProperties.getQueueName()).build();
    rabbitAdmin.declareQueue(queue);

    // 将队列和交换器绑定
    Binding binding = BindingBuilder.bind(queue).to(exchange).with(rabbitProperties.getRoutingKey()).noargs();
    rabbitAdmin.declareBinding(binding);

    return rabbitAdmin;
  }
  1. 声明模板方法 RabbitTemplate,并配置消息转换器 MessageConverter


  /**
   * 模板方法 - 必须是prototype类型,不然每次回调都是最后一个内容
   * @return
   */
  @Bean
  @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(defaultConnectionFactory());
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
  }

RabbitProperties.java 属性文件配置如下:

@Configuration
@ConfigurationProperties(prefix = "rabbit")
@PropertySource(value = "classpath:env/rabbit.properties", ignoreResourceNotFound = true, encoding = "UTF-8")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RabbitProperties {

  private String exchangeName;
  private String queueName;
  private String routingKey;

  private String addresses;
  private String password;
  private String username;
  private String virtualHost;
}

rabbit.properties

# RabbitMQ服务器地址
rabbit.addresses=127.0.0.1:5672
# RabbitMQ服务器用户(默认为guest)
rabbit.username=admin
# RabbitMQ服务器密码(默认为guest)
rabbit.password=admin
rabbit.virtual-host=/test/host
rabbit.exchange-name=test.topic.exchange.name
rabbit.queue-name=test.topic.exchange.queue.name
rabbit.routing-key=test.routing.key

完整 RabbitConfiguration.java 配置文件如下:

@Configuration
@EnableConfigurationProperties(RabbitProperties.class)
public class RabbitConfiguration {

  public static final String CONNECTION_FACTORY = "defaultConnectionFactory";

  @Autowired
  private RabbitProperties rabbitProperties;

  /**
   * 声明连接工厂
   * https://docs.spring.io/spring-amqp/docs/2.1.0.RELEASE/reference/html/_reference.html#_rabbitconnectionfactorybean_and_configuring_ssl
   * @return
   */
  @Bean(name = CONNECTION_FACTORY)
  public ConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setAddresses(rabbitProperties.getAddresses());
    connectionFactory.setUsername(rabbitProperties.getUsername());
    connectionFactory.setPassword(rabbitProperties.getPassword());
    connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());
    connectionFactory.setPublisherConfirms(true);
    // 保证消息的事务性处理rabbitmq默认的处理方式为auto
    // ack,这意味着当你从消息队列取出一个消息时,ack自动发送,mq就会将消息删除。而为了保证消息的正确处理,我们需要将消息处理修改为手动确认的方式

    return connectionFactory;
  }

  /**
   * 自动生成交换器,队列,队列和交换器的绑定关系
   * 声明多个 Exchanges, Queues, Bindings
   * https://docs.spring.io/spring-amqp/docs/2.1.0.RELEASE/reference/html/_reference.html#collection-declaration
   * @return
   */
  @Bean
  public RabbitAdmin rabbitAdmin() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(defaultConnectionFactory());
    // 声明交换器

    Exchange exchange = ExchangeBuilder.topicExchange(rabbitProperties.getExchangeName()).durable(true).build();
    rabbitAdmin.declareExchange(exchange);

    // 声明队列
    Queue queue = QueueBuilder.durable(rabbitProperties.getQueueName()).build();
    rabbitAdmin.declareQueue(queue);

    // 将队列和交换器绑定
    Binding binding = BindingBuilder.bind(queue).to(exchange).with(rabbitProperties.getRoutingKey()).noargs();
    rabbitAdmin.declareBinding(binding);

    return rabbitAdmin;
  }

  /**
   * 模板方法 - 必须是prototype类型,不然每次回调都是最后一个内容
   * @return
   */
  @Bean
  @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
  public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(defaultConnectionFactory());
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
  }

  /**
   * 配置接收端属性
   * @return
   */
  @Bean
  public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(defaultConnectionFactory());
    factory.setPrefetchCount(1);
    // 确认模式
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    // 接收端类型转化pojo,需要序列化
    factory.setMessageConverter(new Jackson2JsonMessageConverter());

    return factory;
  }
}

注意在 @SpringBootApplication 取消了 Rabbit 的自动化配置

@SpringBootApplication(exclude = {
    DataSourceAutoConfiguration.class,
    MybatisAutoConfiguration.class,
    MapperAutoConfiguration.class,
    PageHelperAutoConfiguration.class,
    RabbitAutoConfiguration.class,
    MongoAutoConfiguration.class,

//    SecurityAutoConfiguration.class
})

相关文章

网友评论

      本文标题:SpringBoot RabbitMQ 整合

      本文链接:https://www.haomeiwen.com/subject/moaszftx.html