springboot集成mq
基本使用
- pom 和 yml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
spring:
rabbitmq:
host: 119.29.14.214
port: 5672
username: lg123
password: lg123
virtualHost: /gzsz
publisher-confirm-type: correlated
template:
retry:
enabled: true
initial-interval: 10000ms
max-interval: 300000ms
multiplier: 2
exchange: topic.exchange
- 配置
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_EMAIL = "queue_email";//email队列
public static final String QUEUE_SMS = "queue_sms";//sms队列
public static final String EXCHANGE_NAME="topic.exchange";//topics类型交换机
public static final String ROUTINGKEY_EMAIL="topic.#.email.#";
public static final String ROUTINGKEY_SMS="topic.#.sms.#";
//声明交换机
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//声明email队列
/*
* new Queue(QUEUE_EMAIL,true,false,false)
* durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
* auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
* exclusive 表示该消息队列是否只在当前connection生效,默认是false
*/
@Bean(QUEUE_EMAIL)
public Queue emailQueue(){
return new Queue(QUEUE_EMAIL);
}
//声明sms队列
@Bean(QUEUE_SMS)
public Queue smsQueue(){
return new Queue(QUEUE_SMS);
}
//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
@Bean
public org.springframework.amqp.core.Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,
@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
//ROUTINGKEY_SMS队列绑定交换机,指定routingKey
@Bean
public org.springframework.amqp.core.Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue,
@Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
- 消费者逻辑
@Component
public class ReceiveHandler {
//监听邮件队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_email", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.email.#","email.*"}))
public void rece_email(String msg){
// System.out.println(" [邮件服务] received : " + msg + "!");
System.out.println(" [邮件服务] received : " + msg + "!");
String msg2 = "{\"name\":\"fs\",\"age\":12}";
JSONObject json = JSONObject.parseObject(msg2);
System.out.println(json);
String s = StringEscapeUtils.unescapeJava(msg);
JSONObject json2 = JSONObject.parseObject(s);
System.out.println(json2);
String age =String.valueOf(json2.get("age"));
System.out.println(age);
// System.out.println(jsonObject1);
// JSONObject jsonObject = JSONObject.parseObject(msg);
// System.out.println(jsonObject);
}
//监听短信队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_sms", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.sms.#"}))
public void rece_sms(String msg){
System.out.println(" [短信服务] received : " + msg + "!");
}
}
成熟集成使用
- 配置
@Configuration
public class RabbitmqConfig2 {
// public static final String QUEUE_NAME_MERCHANT = "queue_merch";//有关店铺队列
// public static final String DIRECT_EXCHANGE_MERCHANT = "exchange_merch";//有关店铺交换机,类型direct
// public static final String ROUTING_KEY_MERCHANT = "routing_merch";//队列绑定指定了需要routing key(店铺的)
public final static String ROUTING_EXCHANGE = "routing1"; // 路由的交换机
public final static String ROUTING_1 = "routing_1"; //routing队列1
public final static String ROUTING_2 = "routing_2"; //routing队列2
public final static String ROUTING3_3 = "coupon_merch"; //routing队列3 不要用queue命名
// // 1. 定义队列
// @Bean("queue_merch")
// public Queue SearchMerchantDirectQueue() {
// return new Queue(QUEUE_NAME_MERCHANT, true);
// }
// // 2. 定义交换机
// @Bean("exchange_merch")
// public Exchange SearchMerchantDirectExchange() {
// return ExchangeBuilder.directExchange(DIRECT_EXCHANGE_MERCHANT).build();
// }
// // 3. 队列和交换机绑定
// public Binding MerchantQueueBind(@Qualifier(QUEUE_NAME_MERCHANT) Queue queue, @Qualifier(DIRECT_EXCHANGE_MERCHANT) Exchange exchange) {
// return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME_MERCHANT).noargs();
// }
@Bean("routing_1")
public Queue OUTING_1_Queue() {
return new Queue(ROUTING_1, true, false, true);
}
@Bean("routing_2")
public Queue ROUTING_2_Queue() {
return new Queue(ROUTING_2, true, false, true);
}
@Bean("coupon_merch")
public Queue ROUTING_3_Queue() {
return new Queue(ROUTING3_3, true, false, true);
}
@Bean("routing1")
public Exchange routingExchange(){
return ExchangeBuilder.directExchange(ROUTING_EXCHANGE).build();
}
@Bean
public Binding binding_Routing_QUEUE_1(@Qualifier(ROUTING_1) Queue queue, @Qualifier(ROUTING_EXCHANGE) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_1).noargs();
}
@Bean
public Binding binding_Routing_QUEUE_2(@Qualifier(ROUTING_2) Queue queue, @Qualifier(ROUTING_EXCHANGE) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_2).noargs();
}
@Bean
public Binding binding_Routing_QUEUE_3(@Qualifier(ROUTING3_3) Queue queue, @Qualifier(ROUTING_EXCHANGE) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING3_3).noargs();
}
}
- 消费者
@Component
public class ReceiveHandler {
//监听邮件队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_email", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.email.#","email.*"}))
public void rece_email(String msg){
// JSONObject jsonObject = JSU.parseObject(msg);
// cn.hutool.json.JSONObject jsonObject = JsonUtil.p;
// cn.hutool.json.JSONArray atno_list = jsonObject.getJSONArray("atno_list");
// JSONObject jsonObject1 = JSONObject.parseObject(msg);
// JSONUtil.parseObj(msg, )
// System.out.println(" [邮件服务] received : " + msg + "!");
// String msg2 = "{\"name\":\"fs\",\"age\":12}";
// JSONObject json = JSONObject.parseObject(msg2);
// System.out.println(json);
//
// String s = StringEscapeUtils.unescapeJava(msg);
// JSONObject json2 = JSONObject.parseObject(s);
// System.out.println(json2);
// String age =String.valueOf(json2.get("age"));
// System.out.println(age);
// System.out.println(jsonObject1);
// JSONObject jsonObject = JSONObject.parseObject(msg);
// System.out.println(jsonObject);
System.out.println(" [邮件服务] received : " + msg + "!");
JSONObject jsonObject = JSONObject.parseObject(msg);
String username = String.valueOf(jsonObject.get("username"));
String password = String.valueOf(jsonObject.get("password"));
System.out.println(username);
System.out.println(password);
}
//监听短信队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue_sms", durable = "true"),
exchange = @Exchange(
value = "topic.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"topic.#.sms.#"}))
public void rece_sms(String msg){
System.out.println(" [短信服务] received : " + msg + "!");
}
// 监听direct交换机的消息;queues:队列名称
// @RabbitListener(queues = {RabbitmqConfig2.QUEUE_NAME_MERCHANT})
// public void reciveDirectMsg(String msg) {
// System.out.println("reciveDirectMsg的消费者:" + msg);
//// System.out.println(msg);
// }
// 队列1的消费者
@RabbitListener(queues = RabbitmqConfig2.ROUTING_1)
public void queues_1(String msg) {
System.out.println("队列1的消费者:" + msg);
}
// 队列2的消费者
@RabbitListener(queues = RabbitmqConfig2.ROUTING_2)
public void queues_2(String msg) {
System.out.println("队列2的消费者:" + msg);
}
// 队列3的消费者
@RabbitListener(queues = RabbitmqConfig2.ROUTING3_3)
public void queues_333(String msg) {
System.out.println("队列3的消费者:" + msg);
}
}
- 生产者
//测试mq发送消息
@PostMapping("/getOrderNum2")
public String getOrderNum2() {
UserInfo userInfo = new UserInfo();
userInfo.setUsername("李三55");
userInfo.setPassword("12355");
String s = JsonUtils.toJSONString(userInfo);
// rabbitTemplate.convertAndSend(RabbitmqConfig2.DIRECT_EXCHANGE_MERCHANT,RabbitmqConfig2.ROUTING_KEY_MERCHANT,s);
rabbitTemplate.convertAndSend(RabbitmqConfig2.ROUTING_EXCHANGE, RabbitmqConfig2.ROUTING3_3,JsonUtils.toJSONString(userInfo));
return "调用成功!";
}
//测试mq发送消息
@PostMapping("/getOrderNum3")
public String getOrderNum3() {
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend(RabbitmqConfig2.ROUTING_EXCHANGE, RabbitmqConfig2.ROUTING_1,i);
}
return "调用成功!";
}
网友评论