工作流程:
- 主题交换机是通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或
多个队列。- 主题交换机经常用来实现各种发布/订阅模式及其变种
- 主题交换机通常用来实现消息的多播路由
具体实现:
首先我们需要写一个配置类,具体代码如下:
package com.chuxin.fight.demo.rabbitmq.topic;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* <p>Discription: [广播模式配置类]</p>
* @author muyuanpei
* @date 2018/11/21
*/
@Configuration
public class RabbitTopicConfig {
final static String MESSAGE = "topic.message";
final static String MESSAGES = "topic.message.s";
final static String YMQ = "topic.ymq";
@Bean
public Queue queueMessage() {
return new Queue(RabbitTopicConfig.MESSAGE);
}
@Bean
public Queue queueMessages() {
return new Queue(RabbitTopicConfig.MESSAGES);
}
@Bean
public Queue queueYmq() {
return new Queue(RabbitTopicConfig.YMQ);
}
/**
* 交换机(Exchange) 描述:接收消息并且转发到绑定的队列,交换机不存储消息
*/
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}
//綁定队列 queueMessages() 到 topicExchange 交换机,路由键只接受完全匹配 topic.message 的队列接受者可以收到消息
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange topicExchange) {
return BindingBuilder.bind(queueMessage).to(topicExchange).with("topic.message");
}
//綁定队列 queueMessages() 到 topicExchange 交换机,路由键只要是以 topic.message 开头的队列接受者可以收到消息
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange topicExchange) {
return BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.message.#");
}
//綁定队列 queueYmq() 到 topicExchange 交换机,路由键只要是以 topic 开头的队列接受者可以收到消息
@Bean
Binding bindingExchangeYmq(Queue queueYmq, TopicExchange topicExchange) {
return BindingBuilder.bind(queueYmq).to(topicExchange).with("topic.#");
}
}
接下来配置广播对应的几个接收消息类:
package com.chuxin.fight.demo.rabbitmq.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* <p>Discription: [广播模式接受者]</p>
* @author muyuanpei
* @date 2018/11/21
*/
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {
@RabbitHandler
public void process(String message) {
System.out.println("接收者 TopicReceiver1," + message);
}
}
package com.chuxin.fight.demo.rabbitmq.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* <p>Discription: [广播模式接受者]</p>
* @author muyuanpei
* @date 2018/11/21
*/
@Component
@RabbitListener(queues = "topic.message.s")
public class TopicReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("接收者 TopicReceiver2," + message);
}
}
package com.chuxin.fight.demo.rabbitmq.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* <p>Discription: [广播模式接受者]</p>
* @author muyuanpei
* @date 2018/11/21
*/
@Component
@RabbitListener(queues = "topic.ymq")
public class TopicReceiver3 {
@RabbitHandler
public void process(String message) {
System.out.println("接收者 TopicReceiver3," + message);
}
}
最后我们写一个测试类,测试我们的功能是否可行:
package com.chuxin.fight.demo.rabbitmq.topic;
import com.chuxin.fight.demo.DemoApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* <p>Discription: [广播模式测试类]</p>
* @author muyuanpei
* @date 2018/11/21
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class RabbitTopicTest {
@Autowired
private AmqpTemplate rabbitTemplate;
@Test
public void sendMessageTest() {
String context = "此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到";
String routeKey = "topic.message";
String exchange = "topicExchange";
context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;
System.out.println("sendMessageTest : " + context);
this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
}
@Test
public void sendMessagesTest() {
String context = "此消息在,配置转发消息模式队列下,有 TopicReceiver2 TopicReceiver3 可以收到";
String routeKey = "topic.message.s";
String exchange = "topicExchange";
context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;
System.out.println("sendMessagesTest : " + context);
this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
}
@Test
public void sendYmqTest() {
String context = "此消息在,配置转发消息模式队列下,有 TopicReceiver3 可以收到";
String routeKey = "topic.ymq";
String exchange = "topicExchange";
context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;
System.out.println("sendYmqTest : " + context);
this.rabbitTemplate.convertAndSend(exchange, routeKey, context);
}
}
这样我们就完成了一个主题交换机的使用以及编码。由于自己的服务器过期了,不能展示相应的打印结果。。。
网友评论