在之前的Publish/Subscribe模式中,Exchange会将消息推送给所有绑定到它的队列,类似广播,那么如果我的某条消息只想发到指定的那个队列,类似单播,如何实现呢?这就需要用到Routing模式了:
![](https://img.haomeiwen.com/i12064261/07b6da6741e26fa7.png)
1.direct与binding key
其实理解了之前的Publish/Subscribe,Routing也就好理解了,就是广播和单播的区别,Publish/Subscribe会将消息发送到所有队列,Routing只会将消息发送到routingKey完全匹配的队列。我们只要把代码稍作改动:
- 声明交换器类型为direct
- 指定routingKey,生产者/消费者通过routingKey来识别
在Publish/Subscribe中我们是这样发消息的:
//声明交换器,类型fanout
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//发送消息给交换器,广播不需要routingKey
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
对于单播,我们需要这样写:
//声明交换器 direct类似于单播
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//发消息给交换器,指定routingKey,
channel.basicPublish(EXCHANGE_NAME, "info", null, msg.getBytes());
对于消费者,我们需要指明对应的routingKey
之前的写法
//绑定队列到转发器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
改成这样写
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
2.完整代码
生产者
//路由模式生产者
public class Send {
public static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机 direct类似于单播
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String msg = "hello routing";
//向交换机发送消息,由于交换机没有存储消息的能力,所以如果没有队列绑定到交换机,消息将被丢弃
//这里指定路由键为error,表示我发送error级别的日志
channel.basicPublish(EXCHANGE_NAME, "error", null, msg.getBytes());
channel.basicPublish(EXCHANGE_NAME, "info", null, msg.getBytes());
System.out.println("向交换机发送了消息"+msg);
channel.close();
connection.close();
}
}
消费者1
public class Receive1 {
public static final String QUEUE_NAME = "routing_queue1";
public static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到转发器,这里指定路由键,这里的路由键只有和生产者的路由键匹配,队列才能收到消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
//一次只发一条消息
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
//收到消息就会触发
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"utf-8");
System.out.println("消费者1:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//向生产者发送回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//自动应答
boolean autoAck = false;
//监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
消费者2
public class Receive2 {
public static final String QUEUE_NAME = "routing_queue2";
public static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到转发器,这里绑定了三个路由键,表示能接收三种级别的日志消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
//一次只发一条消息
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
//收到消息就会触发
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"utf-8");
System.out.println("消费者2:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//向生产者发送回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//自动应答
boolean autoAck = false;
//监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
error级别的消息,两个消费者都能收到,info级别的消息,就只有消费者2能收到了,不像之前两个队列都能收到全部的消息。
网友评论