简书:亚武de小文 【原创:转载请注明出处】
主题交换机模式(Topic)
通配符工作模式
LengToo上学.pngRabbitMQ有以下几种工作模式 :
- Work queues
- Publish/Subscribe
- Routing
- Topic
- Headers
- RPC
Topic
模型图
[亚武de小文]Topic模型图.pngProducer:生产者,是发消息方。
Exchanger:路由器交换机,此处类型为topic,它会把消息的路由键取出来,与绑定它的队列做路由键匹配。如果队列关心的路由键能匹配上消息的路由键,则将这个消息投递到这个队列中。
Queue01:一个队列,它是关心orange颜色的队列。
Queue02:一个队列,它是关心rabbit与lazy的队列。
Consumer01:消费者,从Queue01中获取消息。
Consumer01:消费者,从Queue02中获取消息。
* 可以替代一个单词
# 可以替换零个或多个单词
主题交换机模式:
1、每个消费者监听自己的队列,并且设置带通配符的routingkey。
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。
参考代码
生产者
-
队列绑定交换机指定通配符:【统配符规则】
中间以“.”分隔。符号#可以匹配多个词,符号*可以匹配一个词语。
声明交换机,指定topic类型:
-
Producer.java
package com.yawu.xiaowen.topic; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Topic交换机 * 生产者 * * @author yawu * @date 2019.06.30 */ public class Producer { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); private static final String EXCHANGE_NAME = "mq_topic_exchange"; public static void main(String[] args) { try { // RabbitMQ建立连接的管理器 ConnectionFactory factory = new ConnectionFactory(); // 设置服务器地址 factory.setHost("127.0.0.1"); factory.setUsername("guest"); factory.setPassword("guest"); // 创建一个连接 Connection connection = factory.newConnection(); // 创建一个信道 Channel channel = connection.createChannel(); String message = "开始发送信息-topic交换机"; //声明一个TOPIC类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 向交换机发送消息 // 第一种情况:发送【*.orange.*】通配符消息 channel.basicPublish(EXCHANGE_NAME, "like.orange.color", null, message.getBytes("UTF-8")); // 第二种情况:发送【lazy.#】通配符消息 // channel.basicPublish(EXCHANGE_NAME, "lazy.boy.girl", null, message.getBytes("UTF-8")); LOGGER.info("消息发送成功:{}", message); channel.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } }
消费者
-
Consumer01.java
package com.yawu.xiaowen.topic; import com.rabbitmq.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * Topic交换机 * 消费者01 * @date 2019.06.30 * @author yawu */ public class Consumer01 { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); private static final String EXCHANGE_NAME = "mq_topic_exchange"; private static final String QUEUE_NAME_01 = "mq_topic_queue_01"; private static final String QUEUE_NAME_02 = "mq_topic_queue_02"; public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明一个Topic类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 声明两个队列 /** * 【参数详解】 * queue:要创建的队列名 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息 * exclusive:true表示一个队列只能被一个消费者占有并消费 * autoDelete:true表示服务器不在使用这个队列是会自动删除它 * arguments:其它参数 */ channel.queueDeclare(QUEUE_NAME_01, true, false, false, null); channel.queueDeclare(QUEUE_NAME_02, true, false, false, null); final String ROUTING_KEY_ORANGE = "*.orange.*"; final String ROUTING_KEY_LAZY = "lazy.#"; // 队列一对ORANGE感兴趣,匹配 XXX.orange.XXX 的消息 channel.queueBind(QUEUE_NAME_01, EXCHANGE_NAME, ROUTING_KEY_ORANGE); // 队列二对LAZY感兴趣,匹配 lazy.XXX.XXX.XXX channel.queueBind(QUEUE_NAME_02, EXCHANGE_NAME, ROUTING_KEY_LAZY); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String message = new String(body, "UTF-8"); LOGGER.info("队列一收到消息:{}", message); } }; // 队列一确认消息 channel.basicConsume(QUEUE_NAME_01, true, consumer); } catch (Exception e) { LOGGER.error("an exception was occurred , caused by :{}", e.getMessage()); } } }
-
Consumer02.java
package com.yawu.xiaowen.topic; import com.rabbitmq.client.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * Topic交换机 * 消费者02 * @date 2019.06.30 * @author yawu */ public class Consumer02 { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); private static final String EXCHANGE_NAME = "mq_topic_exchange"; private static final String QUEUE_NAME_01 = "mq_topic_queue_01"; private static final String QUEUE_NAME_02 = "mq_topic_queue_02"; public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //声明一个Topic类型的交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 声明两个队列 /** * 【参数详解】 * queue:要创建的队列名 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息 * exclusive:true表示一个队列只能被一个消费者占有并消费 * autoDelete:true表示服务器不在使用这个队列是会自动删除它 * arguments:其它参数 */ channel.queueDeclare(QUEUE_NAME_01, true, false, false, null); channel.queueDeclare(QUEUE_NAME_02, true, false, false, null); final String ROUTING_KEY_ORANGE = "*.orange.*"; final String ROUTING_KEY_LAZY = "lazy.#"; // 队列一对ORANGE感兴趣,匹配 XXX.orange.XXX 的消息 channel.queueBind(QUEUE_NAME_01, EXCHANGE_NAME, ROUTING_KEY_ORANGE); // 队列二对LAZY感兴趣,匹配 lazy.XXX.XXX.XXX channel.queueBind(QUEUE_NAME_02, EXCHANGE_NAME, ROUTING_KEY_LAZY); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { final String message = new String(body, "UTF-8"); LOGGER.info("队列二收到消息:{}", message); } }; //队列一确认消息 channel.basicConsume(QUEUE_NAME_02, true, consumer); } catch (Exception e) { LOGGER.error("an exception was occurred , caused by :{}", e.getMessage()); } } }
测试及运行分析
-
启动生产者服务:(先以第一种情况
启动生产者服务.png*.orange.*
发送)
-
打开RabbitMQ的管理界面,查看绑定关系
查看绑定关系.png -
发现第一种情况自动匹配到了Queue01
匹配队列01.png -
【同理】,第二种情况此处略过。
启动消费者服务,对消息进行消费
启动消费者.png
消息消费.png -
至此,Topic主题通配符交换机学习结束。
网友评论