1.Topic exchange
为了实现组播,我们将exchange的类型设置为topic,同时指定routingKey,注意其routingKey的匹配规则:
- routingKey是一些单词构成,单词将作为最小的匹配单元,单词之间以.分割,上限255字节
- 引入* 和#两个通配符,*可以匹配一个单词,#可以匹配0个或多个单词
对于消费者,在队列绑定到交换器时,指定了每个队列的routingKey如上图所示;对于生产者发送如下routingKey的消息,最终匹配结果为:
- "quick.orange.rabbit" "lazy.orange.elephant" 将会发给Q1 Q2
- "quick.orange.fox" 将会发给Q1 * "lazy.brown.fox"*将会发给Q2
- "lazy.pink.rabbit" 将会发给Q2一次,尽管它匹配了两个绑定规则
- "quick.brown.fox" Q1,Q2都不匹配,将会被丢弃
- "orange" "quick.orange.male.rabbit" 将会被丢弃
- "lazy.orange.male.rabbit" 将会发给Q2
其实Topic Exchange是很灵活的,它可以"变身"成fanout和direct。
- 如果消费者绑定队列到交换器时,指定routingKey为#,将会匹配任何routingKey,就变成了fanout
- 如果不使用# *,直接使用明确的单词,比如"error" "info"就会变成direct
2.代码示例
生产者
//topic模式生产者
public class Send {
public static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机 topic 介于单播/广播之间,通过匹配进行多播
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String msg = "hello topic";
//向交换机发送消息,由于交换机没有存储消息的能力,所以如果没有队列绑定到交换机,消息将被丢弃
//这里指定路由键为goods.add,表示发送添加商品的消息
channel.basicPublish(EXCHANGE_NAME, "goods.add.test", null, msg.getBytes());
System.out.println("向交换机发送了消息"+msg);
channel.close();
connection.close();
}
}
消费者
public class Receive1 {
public static final String QUEUE_NAME = "topic_queue1";
public static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到转发器,这里指定路由键,#表示匹配任意个单词(0个 1个 n个),*表示匹配一个单词,单词之间用.分隔
//这里匹配goods后面带一个单词 匹配不到 goods.add.test 和 goods
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.*");
//一次只发一条消息
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
//收到消息就会触发
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"utf-8");
System.out.println("消费者1:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//向生产者发送回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//自动应答
boolean autoAck = false;
//监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
消费者
public class Receive2 {
public static final String QUEUE_NAME = "topic_queue2";
public static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列到转发器,这里指定路由键,#表示匹配任意个单词(0个 1个 n个),*表示匹配一个单词,单词之间用.分隔
//这里匹配goods后面带任意多个单词 可以匹配goods.add 和 goods.add.test 和 goods
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
//一次只发一条消息
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){
//收到消息就会触发
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"utf-8");
System.out.println("消费者2:"+msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//向生产者发送回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//自动应答
boolean autoAck = false;
//监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
网友评论