主题模式(Topic)
主体模式其实就是在路由模式的基础上,支持了对key的通配符匹配(星号以及井号),以满足更加复杂的消息分发场景。
“#” : 匹配一个或者多个
“*”:匹配一个
例如上图中,lazy.#可以匹配到key=lazy.a或者key=lazy.a.b。.orange只能匹配到a.orange,无法匹配a.b.orange
代码
public class Send {
private static final String EXCHANGE_NAME = "topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 推送消息到交换机时携带key
String msg1 = "error.log";
String key1 = "error.log";
channel.basicPublish(EXCHANGE_NAME, key1, null, msg1.getBytes());
String msg2 = "success.log";
String key2 = "success.log";
channel.basicPublish(EXCHANGE_NAME, key2, null, msg2.getBytes());
String msg3 = "a.b.log";
String key3 = "a.b.log";
channel.basicPublish(EXCHANGE_NAME, key3, null, msg3.getBytes());
channel.close();
connection.close();
}
}
public class Rec1 {
private static final String EXCHANGE_NAME = "topic";
private static final String QUEUE_NAME = "topic_queue1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 交换机绑定,绑定交换机时携带key
String key = "#.log";
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, key);
// 接收到消息后的回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(LocalDateTime.now().toString() + " [x] Received '" + message + "'");
};
// 监听队列,每当队列中接收到新消息后会触发回调函数
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
public class Rec2 {
private static final String EXCHANGE_NAME = "topic";
private static final String QUEUE_NAME = "topic_queue2";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 交换机绑定,绑定交换机时携带key
String key = "*.log";
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, key);
// 接收到消息后的回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(LocalDateTime.now().toString() + " [x] Received '" + message + "'");
};
// 监听队列,每当队列中接收到新消息后会触发回调函数
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
运行结果
rec1 rec2其中rec1中的key为“#.log”,rec2中的key为“*.log”。结合上图的运行结果,我们更好理解型号以及井号的匹配规则。
“#.log”匹配了error.log、success.log以及a.b.log
“.log”只匹配了error.log以及success.log
“#” :匹配一个或者多个
“*”:匹配一个
关键代码
生产者:
// 生产者声明exchange成topic类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 生产者发送消息的同时携带上key
String msg1 = "error.log";
String key1 = "error.log";
channel.basicPublish(EXCHANGE_NAME, key1, null, msg1.getBytes());
消费者:
// 交换机绑定,绑定交换机时携带key
String key = "*.log";
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, key);
网友评论