springboot2.0集成rabbitmq
最近需要用上rabbitmq,项目是用springboot搭建的,所以就要在springboot环境下单独集成rabbitmq,写这篇文章用来记录下。不多说了,进入正题。
1.首先引入Maven依赖:spring-boot-starter-amqp
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.yml中引入配置项:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
virtual-host: vhost_one
publisher-confirms: true #开启发送确认
publisher-returns: true #开启发送失败退回
3.生产者:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableAutoConfiguration
public class RabbitMqConfig {
@Bean
public RabbitTemplate setRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setExchange("topicExchange");
// 设置消息内容序列化方式
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
/**
* 生成主题路由器
* @return
*/
@Bean("topicExchange")
public TopicExchange topicExchange() {
return new TopicExchange("topicExchange", true, false);
}
/**
* 定义发送短信的队列
* @return
*/
@Bean("sendSMSQueue")
public Queue sendSMSQueue() {
return new Queue("sendSMS", true, false, false);
}
/**
* 创建发送短信的队列和主题交换机的绑定关系
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding bingExchangeANDqueue(@Qualifier("sendSMSQueue")Queue queue, @Qualifier("topicExchange")TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("common.sendSMS.#");
}
}
上面配置好了RabbitTemplate和路由和队列以及绑定关系,在项目中直接调用RabbitTemplate的发送消息的方法即可:
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void send() {
rabbitTemplate.convertAndSend("common.sendSMS.msg", "rabbitmq发送短信1给你了");
}
4.消费者:
application.yml中引入配置项和生产者一样:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
virtual-host: vhost_one
同时也需要设置RabbitConfiguration来设置序列化的方式,就用spring提供的Jackson2JsonMessageConverter类来方序列化和序列化,RabbitConfiguration类如下:
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
调用示例:
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@EnableRabbit
public class Consumer {
@RabbitListener(queues = "sendSMS", containerFactory = "myFactory")
public void sendMsg(String content) {
System.out.println(content);
}
}
网友评论