pom.xml 中引入依赖
<!-- Spring Boot QMQP Starter-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>${springboot.version}</version>
</dependency>
- 声明
ConnectionFactory
/**
* 声明连接工厂
* https://docs.spring.io/spring-amqp/docs/2.1.0.RELEASE/reference/html/_reference.html#_rabbitconnectionfactorybean_and_configuring_ssl
* @return
*/
@Bean(name = CONNECTION_FACTORY)
public ConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(rabbitProperties.getAddresses());
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());
connectionFactory.setPublisherConfirms(true);
// 保证消息的事务性处理rabbitmq默认的处理方式为auto
// ack,这意味着当你从消息队列取出一个消息时,ack自动发送,mq就会将消息删除。而为了保证消息的正确处理,我们需要将消息处理修改为手动确认的方式
return connectionFactory;
}
- 声明
RabbitAdmin
配置Exchange
,Queue
,Binding
/**
* 自动生成交换器,队列,队列和交换器的绑定关系
* 声明多个 Exchanges, Queues, Bindings
* https://docs.spring.io/spring-amqp/docs/2.1.0.RELEASE/reference/html/_reference.html#collection-declaration
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(defaultConnectionFactory());
// 声明交换器
Exchange exchange = ExchangeBuilder.topicExchange(rabbitProperties.getExchangeName()).durable(true).build();
rabbitAdmin.declareExchange(exchange);
// 声明队列
Queue queue = QueueBuilder.durable(rabbitProperties.getQueueName()).build();
rabbitAdmin.declareQueue(queue);
// 将队列和交换器绑定
Binding binding = BindingBuilder.bind(queue).to(exchange).with(rabbitProperties.getRoutingKey()).noargs();
rabbitAdmin.declareBinding(binding);
return rabbitAdmin;
}
- 声明模板方法
RabbitTemplate
,并配置消息转换器MessageConverter
/**
* 模板方法 - 必须是prototype类型,不然每次回调都是最后一个内容
* @return
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(defaultConnectionFactory());
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
RabbitProperties.java 属性文件配置如下:
@Configuration
@ConfigurationProperties(prefix = "rabbit")
@PropertySource(value = "classpath:env/rabbit.properties", ignoreResourceNotFound = true, encoding = "UTF-8")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RabbitProperties {
private String exchangeName;
private String queueName;
private String routingKey;
private String addresses;
private String password;
private String username;
private String virtualHost;
}
rabbit.properties
# RabbitMQ服务器地址
rabbit.addresses=127.0.0.1:5672
# RabbitMQ服务器用户(默认为guest)
rabbit.username=admin
# RabbitMQ服务器密码(默认为guest)
rabbit.password=admin
rabbit.virtual-host=/test/host
rabbit.exchange-name=test.topic.exchange.name
rabbit.queue-name=test.topic.exchange.queue.name
rabbit.routing-key=test.routing.key
完整 RabbitConfiguration.java 配置文件如下:
@Configuration
@EnableConfigurationProperties(RabbitProperties.class)
public class RabbitConfiguration {
public static final String CONNECTION_FACTORY = "defaultConnectionFactory";
@Autowired
private RabbitProperties rabbitProperties;
/**
* 声明连接工厂
* https://docs.spring.io/spring-amqp/docs/2.1.0.RELEASE/reference/html/_reference.html#_rabbitconnectionfactorybean_and_configuring_ssl
* @return
*/
@Bean(name = CONNECTION_FACTORY)
public ConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(rabbitProperties.getAddresses());
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());
connectionFactory.setPublisherConfirms(true);
// 保证消息的事务性处理rabbitmq默认的处理方式为auto
// ack,这意味着当你从消息队列取出一个消息时,ack自动发送,mq就会将消息删除。而为了保证消息的正确处理,我们需要将消息处理修改为手动确认的方式
return connectionFactory;
}
/**
* 自动生成交换器,队列,队列和交换器的绑定关系
* 声明多个 Exchanges, Queues, Bindings
* https://docs.spring.io/spring-amqp/docs/2.1.0.RELEASE/reference/html/_reference.html#collection-declaration
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(defaultConnectionFactory());
// 声明交换器
Exchange exchange = ExchangeBuilder.topicExchange(rabbitProperties.getExchangeName()).durable(true).build();
rabbitAdmin.declareExchange(exchange);
// 声明队列
Queue queue = QueueBuilder.durable(rabbitProperties.getQueueName()).build();
rabbitAdmin.declareQueue(queue);
// 将队列和交换器绑定
Binding binding = BindingBuilder.bind(queue).to(exchange).with(rabbitProperties.getRoutingKey()).noargs();
rabbitAdmin.declareBinding(binding);
return rabbitAdmin;
}
/**
* 模板方法 - 必须是prototype类型,不然每次回调都是最后一个内容
* @return
*/
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(defaultConnectionFactory());
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
/**
* 配置接收端属性
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(defaultConnectionFactory());
factory.setPrefetchCount(1);
// 确认模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 接收端类型转化pojo,需要序列化
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
注意在 @SpringBootApplication
取消了 Rabbit 的自动化配置
@SpringBootApplication(exclude = {
DataSourceAutoConfiguration.class,
MybatisAutoConfiguration.class,
MapperAutoConfiguration.class,
PageHelperAutoConfiguration.class,
RabbitAutoConfiguration.class,
MongoAutoConfiguration.class,
// SecurityAutoConfiguration.class
})
网友评论