1.加入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.定义常量
- 定义交换器常量
public class Exchanges {
public static final String MESSAGE_EXCHANGE = "messageExchange";
}
- 定义消息队列常量
public class Queues {
public final static String WEIXIN_MESSAGE = "message.weixin.queue";
public final static String SMS_MESSAGE = "message.sms.queue";
public final static String XINGE_MESSAGE = "message.xinge.queue";
}
- 定义路由键常量
public class RoutingKey {
public final static String WEIXIN_MESSAGE_KEY = "message.weixin.key";
public final static String SMS_MESSAGE_KEY = "message.sms.key";
public final static String XINGE_MESSAGE_KEY = "message.xinge.key";
}
3.配置消息队列、交换器和绑定
@Configuration
public class MessageMqConfig {
@Bean
public Queue weixinQueue() {
return new Queue(Queues.WEIXIN_MESSAGE);
}
@Bean
public Queue smsQueue() {
return new Queue(Queues.SMS_MESSAGE);
}
@Bean
public Queue xingeQueue() {
return new Queue(Queues.XINGE_MESSAGE);
}
/**
* 创建交换器
*
* @return
*/
@Bean
public TopicExchange messageExchange() {
return new TopicExchange(Exchanges.MESSAGE_EXCHANGE);
}
/**
* 对列绑定并关联到ROUTINGKEY
*
* @param weixinQueue
* @param messageExchange
* @return
*/
@Bean
public Binding bindingWeixinExchangeQueue(Queue weixinQueue, TopicExchange messageExchange) {
return BindingBuilder.bind(weixinQueue).to(messageExchange).with(RoutingKey.WEIXIN_MESSAGE_KEY);
}
@Bean
public Binding bindingSmsExchangeQueue(Queue smsQueue, TopicExchange messageExchange) {
return BindingBuilder.bind(smsQueue).to(messageExchange).with(RoutingKey.SMS_MESSAGE_KEY);
}
@Bean
public Binding bindingXingeExchangeQueue(Queue xingeQueue, TopicExchange messageExchange) {
return BindingBuilder.bind(xingeQueue).to(messageExchange).with(RoutingKey.XINGE_MESSAGE_KEY);
}
}
4.创建消息生产者和消费者
- 消费者
@Component
public class MessageConsumer {
@RabbitListener(queues = Queues.WEIXIN_MESSAGE)
@RabbitHandler
public void weixinHandler(String message){
System.out.println("weixin:"+message);
}
@RabbitListener(queues = Queues.SMS_MESSAGE)
@RabbitHandler
public void smsHandler(String message){
System.out.println("sms:"+message);
}
@RabbitListener(queues = Queues.XINGE_MESSAGE)
@RabbitHandler
public void xingeHandler(String message){
System.out.println("xinge:"+message);
}
}
- 生产者
@RestController
public class ProviderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public String sendMessage(String message, Integer type) {
if (type == 0) {
rabbitTemplate.convertAndSend(Exchanges.MESSAGE_EXCHANGE, RoutingKey.WEIXIN_MESSAGE_KEY, message);
} else if (type == 1) {
rabbitTemplate.convertAndSend(Exchanges.MESSAGE_EXCHANGE, RoutingKey.SMS_MESSAGE_KEY, message);
} else {
rabbitTemplate.convertAndSend(Exchanges.MESSAGE_EXCHANGE, RoutingKey.XINGE_MESSAGE_KEY, message);
}
return "发送成功";
}
}
5.运行
启动项目,打开Rabbitmq Management页面,点击Queues,可以看到所有的消息队列。
点击一个队列,看到该消息队列的绑定关系,如message.sms.queue是通过路由键message.sms.key从messageExchange转发而来。
队列bindings
点击Exchanges,可以看到所有的交换器。
交换器
点击messageExchange,可以看到交换器和消息队列的绑定关系。从这张图可以看出,生产者通过
rabbitTemplate.convertAndSend("messageExchange", RoutingKey.WEIXIN_MESSAGE_KEY, message)
发送消息到交换器上,并确定其路由键,消息发送到交换器上后,通过Routing Key将消息发送到相对应的消息队列中,消费者通过队列名订阅消息。交换器bindings
rabbitmq 的topic模式自由度较高的原因是因为可以通过交换器和队列的绑定动态订阅,关键代码如下
@Bean
public Binding bindingXingeExchangeQueue(Queue xingeQueue, TopicExchange messageExchange) {
return BindingBuilder.bind(xingeQueue).to(messageExchange).with(RoutingKey.XINGE_MESSAGE_KEY);
}
如果更改绑定的Routing Key,此时发送到交换器中的消息将会根据Rounting Key发送到不同的消息队列中。如果将此段代码改成如下
@Bean
public Binding bindingXingeExchangeQueue(Queue xingeQueue, TopicExchange messageExchange) {
return BindingBuilder.bind(xingeQueue).to(messageExchange).with("message.#.key");
}
则之前的三种方式,都会转发到message.xinge.queue队列中。
6. Topic通配符
通配符 | 解释 |
---|---|
* | 可以(只能)匹配一个单词 |
# | 可以匹配多个单词(或者零个) |
网友评论