说明
在第四个教程中,我们对日志系统进行了改进,使用direct
类型的交换机代替了只能模拟广播的fanout
类型的交换机,使选择性的接收日志信息成为可能。但是它仍然存在局限性--它不能基于多个条件进行路由。比如在日志系统中,我们可能不止需要基于严重性的日志订阅,还需要根据发出的日志信息的源进行订阅。通过UNIX日志工具syslog
可能了解这个概念,它的路由日志既有严重程度(info/warn/crit...)又有工具(auth/cron/kern...)。这就给我们提供了很大的灵活性,我们想听来自cron的严重错误的日志以及kern的全部日志。
为了实现这个功能,我们需要学习更复杂的topic
交换机。
主题交换机(Topic exchange)
发送到topic
交换机的信息不能有任意的routing_key
,它必须是用点.
分隔的一组单词列表。这些单词可以是任何东西,但是通常它们都指定一些与消息相关的特性。一些有效的routing key的列子:stock.usd.nyse
,nyse.vmw
,quick.orange.rabbit
。
【注】routing key可以包含任意多个单词,但是最大支持255个字节(255 bytes)
binding key也必须采用相同的形式。topic
交换机的逻辑和direct
交换机的逻辑很相似:使用特定的路由键(routing key)发送的消息将被传递到使用匹配绑定键(binding key)绑定的所有队列中。但是绑定键具有两个重要的特例:
星号*
可以代替一个单词;井号#
可以代替零个或多个单词。

在上图所示的例子中,我们将发送所有描述动物的信息。这条消息将会使用包含三个单词(两个点)的路由键发送。第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种:<speed>.<colour>.<species>
。
图中创建了三个绑定,分别是:Q1绑定了*.orange.*
,Q2绑定了*.*.rabbit
和lazy.#
。可以理解为Q1对所有橙色的动物感兴趣,Q2对所有的兔子以及有关于懒惰的动物感兴趣。
当路由键被设置为quick.orange.rabbit
时两个队列都会接收到消息,设置为lazy.orange.elephant
时两个队列也都会接收到消息。当键为quick.orange.fox
时只有队列Q1会收到消息而键为lazy.brown.fox
只有队列Q2会收到消息。键为lazy.pink.rabbit
的消息只会被发送给Q2队列1次,虽然它匹配了两个绑定。没有匹配绑定的quick.brown.fox
的消息将会被丢弃。
如果我们违反约定发送了一个单词或者四个单词会怎样呢?像orange
或者quick.orange.male.rabbit
这样消息将不会匹配任何绑定并且会被丢弃;像lazy.orange.male.rabbit
将会匹配最后一个绑定然后传递到Q2队列中。
【Topic exchange】
Topic类型的交换机是非常强大的,它可以表现的和其他交换机一样:当队列使用
#
作为绑定键时它将接收所有的消息,就像fanout
交换机一样;当队列中不使用任何带有特殊字符(* 和 #
)的绑定时topic交换机的行为就会像一个direct
交换机。
完整的代码
EmitLogTopic.java
(https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/EmitLogTopic.java):
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
//..
}
ReceiveLogsTopic.java
(https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/ReceiveLogsTopic.java):
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}
for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
网友评论