为了演示Topic模式,所以模拟一个需求:
用户A想接收开封市的天气
用户B想接收开封市的新闻
用户C想接收开封市的新闻和天气
1.创建基于SpringBoot的生产者模块和消费者模块,两个模块结构相同,在hello包下创建SpringBoot主启动类。
2.引入starter
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
3.在resources包下创建application.yml文件,内容如下(根据实际情况填写地址用户名密码等等):
4.在生产者模块创建config包,在此包下创建一个配置类(功能是声明队列,声明交换机,声明RoutingKey,声明队列和交换机的绑定关系),结构和内容如下:
package com.hello.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//为了方便演示我都使用的public
//队列
public static final String QUEUE_NEWS = "queue_news";
public static final String QUEUE_WEATHER = "queue_weather";
public static final String QUEUE_NEWS_WEATHER = "queue_news_weather";
//交换机
public static final String EXCHANGE_TOPIC_KAIFENG = "exchange_topic_kaifeng";
//交换机与队列绑定的RoutingKey
public static final String ROUTINGKEY_NEWS = "*.news";
public static final String ROUTINGKEY_WEATHER = "*.weather";
public static final String ROUTINGKEY_ALL = "kaifeng.*";
//声明队列
@Bean(QUEUE_NEWS)
public Queue QUEUE_NEWS(){ //新闻的队列
return new Queue(QUEUE_NEWS);
}
@Bean(QUEUE_WEATHER)
public Queue QUEUE_WEATHER(){ //天气的队列
return new Queue(QUEUE_WEATHER);
}
@Bean(QUEUE_NEWS_WEATHER)
public Queue QUEUE_NEWS_WEATHER(){ //新闻和天气的队列
return new Queue(QUEUE_NEWS_WEATHER);
}
//声明交换机
@Bean(EXCHANGE_TOPIC_KAIFENG)
public Exchange EXCHANGE_TOPIC_INFORM(){
//声明了一个Topic类型的交换机,durable是持久化(重启rabbitmq这个交换机不会被自动删除)
return ExchangeBuilder.topicExchange(EXCHANGE_TOPIC_KAIFENG).durable(true).build();
}
//声明news队列和交换机绑定关系,并且指定RoutingKey
@Bean
public Binding NEWS_BINDING_TOPIC(@Qualifier(QUEUE_NEWS) Queue queue,
@Qualifier(EXCHANGE_TOPIC_KAIFENG) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_NEWS).noargs();
}
//声明weather队列和交换机绑定关系,并且指定RoutingKey
@Bean
public Binding WEATHER_BINDING_TOPIC(@Qualifier(QUEUE_WEATHER) Queue queue,
@Qualifier(EXCHANGE_TOPIC_KAIFENG) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_WEATHER).noargs();
}
//声明news+weather队列和交换机绑定关系,并且指定RoutingKey
@Bean
public Binding NEWS_WEATHER_BINDING_TOPIC(@Qualifier(QUEUE_NEWS_WEATHER) Queue queue,
@Qualifier(EXCHANGE_TOPIC_KAIFENG) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_ALL).noargs();
}
}
5. 在测试类发送消息
@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendNews(){
//指定交换机,指定routing key,发送消息的内容
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC_KAIFENG,"kaifeng.news","开封今年粮食产量提升10%");
}
@Test
public void sendWeather(){
//指定交换机,指定routing key,发送消息的内容
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_TOPIC_KAIFENG,"kaifeng.weather","开封明天白天多云15℃");
}
}
6.当前整个结构如下
目的.png7.我们也可以Rabbit网页管理平台上面查看队列和交换机都已经创建,交换机绑定的队列关系也已经绑定,消息因为没有消费者所以也是存储在MQ中。
交换机和队列的绑定关系7.在消费者模块创建一个类MQConsumer接收消息
123.png内容如下:
package com.hello.mq;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MQConsumer {
//模拟三个用户接收
@RabbitListener(queues = {"queue_weather"})
public void getMessageA(String msg){
System.out.println("A用户想看天气接收到:" + msg);
}
@RabbitListener(queues = {"queue_news"})
public void getMessageB(Message message){ //我们也可以用Message作为参数接收
byte[] body = message.getBody();
String msg = new String(body);
System.out.println("B用户想看新闻接收到:" + msg);
}
@RabbitListener(queues = {"queue_news_weather"})
public void getMessageC(String msg){
System.out.println("C用户想看新闻和天气接收到:" + msg);
}
}
网友评论