美文网首页RabbitMQ工作生活亚武学习
Rabbitmq打怪升级之路(十一)Topic-主题通配符模式

Rabbitmq打怪升级之路(十一)Topic-主题通配符模式

作者: 亚武de小文 | 来源:发表于2019-07-01 10:20 被阅读0次

    简书:亚武de小文 【原创:转载请注明出处】

    主题交换机模式(Topic)

    通配符工作模式
    LengToo上学.png

    RabbitMQ有以下几种工作模式 :

    • Work queues
    • Publish/Subscribe
    • Routing
    • Topic
    • Headers
    • RPC

    Topic

    模型图
    [亚武de小文]Topic模型图.png

    Producer:生产者,是发消息方。
    Exchanger:路由器交换机,此处类型为topic,它会把消息的路由键取出来,与绑定它的队列做路由键匹配。如果队列关心的路由键能匹配上消息的路由键,则将这个消息投递到这个队列中。
    Queue01:一个队列,它是关心orange颜色的队列。
    Queue02:一个队列,它是关心rabbit与lazy的队列。
    Consumer01:消费者,从Queue01中获取消息。
    Consumer01:消费者,从Queue02中获取消息。
    * 可以替代一个单词
    # 可以替换零个或多个单词

    主题交换机模式:
    1、每个消费者监听自己的队列,并且设置带通配符的routingkey。
    2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

    参考代码
    生产者
    • 队列绑定交换机指定通配符:【统配符规则】 中间以“.”分隔。符号#可以匹配多个词,符号*可以匹配一个词语。
      声明交换机,指定topic类型:

    • Producer.java

      package com.yawu.xiaowen.topic;
      
      import com.rabbitmq.client.BuiltinExchangeType;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      /**
       * Topic交换机
       * 生产者
       *
       * @author yawu
       * @date 2019.06.30
       */
      public class Producer {
          private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
          private static final String EXCHANGE_NAME = "mq_topic_exchange";
      
          public static void main(String[] args) {
              try {
                  // RabbitMQ建立连接的管理器
                  ConnectionFactory factory = new ConnectionFactory();
                  // 设置服务器地址
                  factory.setHost("127.0.0.1");
                  factory.setUsername("guest");
                  factory.setPassword("guest");
      
                  // 创建一个连接
                  Connection connection = factory.newConnection();
                  // 创建一个信道
                  Channel channel = connection.createChannel();
      
                  String message = "开始发送信息-topic交换机";
      
                  //声明一个TOPIC类型的交换机
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
      
                  // 向交换机发送消息
                  // 第一种情况:发送【*.orange.*】通配符消息
                  channel.basicPublish(EXCHANGE_NAME, "like.orange.color", null, message.getBytes("UTF-8"));
                  // 第二种情况:发送【lazy.#】通配符消息
      //            channel.basicPublish(EXCHANGE_NAME, "lazy.boy.girl", null, message.getBytes("UTF-8"));
      
                  LOGGER.info("消息发送成功:{}", message);
                  channel.close();
                  connection.close();
      
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      
      }
      
      
    消费者
    • Consumer01.java
      package com.yawu.xiaowen.topic;
      
      import com.rabbitmq.client.*;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.io.IOException;
      
      /**
       * Topic交换机
       * 消费者01
       * @date 2019.06.30
       * @author yawu
       */
      public class Consumer01 {
          private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
          private static final String EXCHANGE_NAME = "mq_topic_exchange";
          private static final String QUEUE_NAME_01 = "mq_topic_queue_01";
          private static final String QUEUE_NAME_02 = "mq_topic_queue_02";
      
          public static void main(String[] args) {
              try {
                  ConnectionFactory factory = new ConnectionFactory();
                  factory.setHost("127.0.0.1");
                  Connection connection = factory.newConnection();
                  Channel channel = connection.createChannel();
      
                  //声明一个Topic类型的交换机
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
      
                  // 声明两个队列
                  /**
                   * 【参数详解】
                   * queue:要创建的队列名
                   * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息
                   * exclusive:true表示一个队列只能被一个消费者占有并消费
                   * autoDelete:true表示服务器不在使用这个队列是会自动删除它
                   * arguments:其它参数
                   */
                  channel.queueDeclare(QUEUE_NAME_01, true, false, false, null);
                  channel.queueDeclare(QUEUE_NAME_02, true, false, false, null);
      
                  final String ROUTING_KEY_ORANGE = "*.orange.*";
                  final String ROUTING_KEY_LAZY = "lazy.#";
                  // 队列一对ORANGE感兴趣,匹配  XXX.orange.XXX 的消息
                  channel.queueBind(QUEUE_NAME_01, EXCHANGE_NAME, ROUTING_KEY_ORANGE);
                  // 队列二对LAZY感兴趣,匹配  lazy.XXX.XXX.XXX
                  channel.queueBind(QUEUE_NAME_02, EXCHANGE_NAME, ROUTING_KEY_LAZY);
      
                  Consumer consumer = new DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          final String message = new String(body, "UTF-8");
                          LOGGER.info("队列一收到消息:{}", message);
                      }
                  };
      
                  // 队列一确认消息
                  channel.basicConsume(QUEUE_NAME_01, true, consumer);
      
              } catch (Exception e) {
                  LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
              }
          }
      }
      
    • Consumer02.java
      package com.yawu.xiaowen.topic;
      
      import com.rabbitmq.client.*;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.io.IOException;
      
      /**
       * Topic交换机
       * 消费者02
       * @date 2019.06.30
       * @author yawu
       */
      public class Consumer02 {
          private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
          private static final String EXCHANGE_NAME = "mq_topic_exchange";
          private static final String QUEUE_NAME_01 = "mq_topic_queue_01";
          private static final String QUEUE_NAME_02 = "mq_topic_queue_02";
      
          public static void main(String[] args) {
              try {
                  ConnectionFactory factory = new ConnectionFactory();
                  factory.setHost("127.0.0.1");
                  Connection connection = factory.newConnection();
                  Channel channel = connection.createChannel();
      
                  //声明一个Topic类型的交换机
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
      
                  // 声明两个队列
                  /**
                   * 【参数详解】
                   * queue:要创建的队列名
                   * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息
                   * exclusive:true表示一个队列只能被一个消费者占有并消费
                   * autoDelete:true表示服务器不在使用这个队列是会自动删除它
                   * arguments:其它参数
                   */
                  channel.queueDeclare(QUEUE_NAME_01, true, false, false, null);
                  channel.queueDeclare(QUEUE_NAME_02, true, false, false, null);
      
                  final String ROUTING_KEY_ORANGE = "*.orange.*";
                  final String ROUTING_KEY_LAZY = "lazy.#";
                  // 队列一对ORANGE感兴趣,匹配  XXX.orange.XXX 的消息
                  channel.queueBind(QUEUE_NAME_01, EXCHANGE_NAME, ROUTING_KEY_ORANGE);
                  // 队列二对LAZY感兴趣,匹配  lazy.XXX.XXX.XXX
                  channel.queueBind(QUEUE_NAME_02, EXCHANGE_NAME, ROUTING_KEY_LAZY);
      
                  Consumer consumer = new DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          final String message = new String(body, "UTF-8");
                          LOGGER.info("队列二收到消息:{}", message);
                      }
                  };
      
                  //队列一确认消息
                  channel.basicConsume(QUEUE_NAME_02, true, consumer);
      
              } catch (Exception e) {
                  LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
              }
          }
      }
      
      
    测试及运行分析
    1. 启动生产者服务:(先以第一种情况*.orange.*发送)

      启动生产者服务.png
    2. 打开RabbitMQ的管理界面,查看绑定关系


      查看绑定关系.png
    3. 发现第一种情况自动匹配到了Queue01


      匹配队列01.png
    4. 【同理】,第二种情况此处略过。
      启动消费者服务,对消息进行消费


      启动消费者.png
      消息消费.png
    5. 至此,Topic主题通配符交换机学习结束。

    相关文章

      网友评论

        本文标题:Rabbitmq打怪升级之路(十一)Topic-主题通配符模式

        本文链接:https://www.haomeiwen.com/subject/hglbcctx.html