RabbitMQ官网中文版教程:
http://rabbitmq.mr-ping.com/tutorials_with_python/[4]Routing.html
上述教程示例为pathon版,Java版及相应解释如下:
生产者
package com.xc.rabbit.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* Created by xc.
*/
public class RoutingSendDirect {
private static final String EXCHANGE_NAME = "direct_logs";
// 路由关键字
private static final String[] routingKeys = new String[] {"info", "warning", "error"};
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("rabbit");
factory.setPassword("carrot");
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 发送消息
for (String severity : routingKeys) {
String message = "Send the message level : " + severity;
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
channel.close();
connection.close();
}
}
消费者1
package com.xc.rabbitmq.routing;
import com.rabbit.client.*;
import java.io.IOException;
/**
* Created by xc.
*/
public class ReceiveLogsDirect1 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_logs";
// 路由关键字
private static final String[] routingKey = new String[]{"info", "warning", "error"};
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("rabbit");
factory.setPassword("carrot");
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 获取匿名队列名称
String queueName = channel.queueDeclare().getQueue();
// 根据路由关键字进行多重绑定
for (String severity : routingKey) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
System.out.println("ReceiveLogsDirect1 exchange : " + EXCHANGE_NAME +
", queue : " + queueName + ", BindRoutingKey : " + severity);
}
System.out.println("ReceiveLogsDirect1 [*] Waiting for messages. To exit press CTRL + C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
消费者2
package com.xc.rabbit.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* Created by xc.
*/
public class ReceiveLogsDirect2 {
// 交换器名称
private static final String EXCHANGE_NAME = "direct_logs";
// 路由关键字
private static final String[] routingKey = new String[]{"error"};
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("rabbit");
factory.setPassword("carrot");
// 创建一个新的连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 获取匿名队列名称
String queueName = channel.queueDeclare().getQueue();
// 根据路由关键字进行多重绑定
for (String severity : routingKey) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
System.out.println("ReceiveLogsDirect2 exchange : " + EXCHANGE_NAME +
", queue : " + queueName + ", BindRoutingKey : " + severity);
}
System.out.println("ReceiveLogsDirect2 [*] Waiting for messages. To exit press CTRL + C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
运行结果如下:
由图可知,生产者发出的消息,根据不同的路由,发送到不同的队列,进而被不同的消费者接收。
先跑消费者程序,在跑生产者程序。否则,生产者的消息到达交换器之后,如果没有队列连上交换器, 则消息被直接丢弃。
注意:
-
Bindings can take an extra routingKey parameter. To avoid the confusion with a basic_publish,parameter we're going to call it a binding key.
binding key和routing key是一回事,为了避免概念重复,channel.queueBind时叫binding key, channel.basicPublish时叫routing key。 -
The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.
direct交换器的路由规则很简单,消息会路由到binding key与routing key相同的队列。
网友评论