美文网首页spring boot亚武学习
Rabbitmq打怪升级之路(十二)Headers-头交换机模式

Rabbitmq打怪升级之路(十二)Headers-头交换机模式

作者: 亚武de小文 | 来源:发表于2019-07-01 16:48 被阅读0次

    简书:亚武de小文 【原创:转载请注明出处】

    头交换机模式(Headers)

    LengToo上学.png
    RabbitMQ有以下几种工作模式 :
    • Work queues
    • Publish/Subscribe
    • Routing
    • Topic
    • Headers
    • RPC

    Header
    模型图
    [亚武de小文]Headers模型图.png

    消息header数据里有一个特殊的键x-match,它有两个值:

    • all: 默认值。一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
    • any: 一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机

    头交换机和主题交换机类似,区别在于:Topic路由值是基于路由键,Headers的路由值基于消息的header数据。 主题交换机路由键只有是字符串,而头交换机可以是整型和哈希值。

    参考代码
    生产者
    • Producer.java
      package com.yawu.xiaowen.header;
      
      import com.rabbitmq.client.*;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.util.Map;
      
      /**
       * Header交换机
       * 生产者
       *
       * @author yawu
       * @date 2019.07.01
       */
      public class Producer {
          private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
          private static final String EXCHANGE_NAME = "mq_header_exchange";
      
          public static void execute(Map<String, Object> headerMap) {
              try {
                  // RabbitMQ建立连接的管理器
                  ConnectionFactory factory = new ConnectionFactory();
                  // 设置服务器地址
                  factory.setHost("127.0.0.1");
                  factory.setUsername("guest");
                  factory.setPassword("guest");
      
                  // 创建一个连接
                  Connection connection = factory.newConnection();
                  // 创建一个信道
                  Channel channel = connection.createChannel();
      
                  String message = "发送信息-headers交换机";
      
                  //声明一个Header类型的交换机
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
      
                  // 生成发送消息的属性
                  AMQP.BasicProperties props = new AMQP.BasicProperties
                          .Builder()
                          .headers(headerMap)
                          .build();
      
                  // 向交换机发送消息
                  channel.basicPublish(EXCHANGE_NAME, "like.orange.color", null, message.getBytes("UTF-8"));
      
                  LOGGER.info("消息发送成功:{}", message);
                  channel.close();
                  connection.close();
      
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      
      }
      
    消费者
    • Consumer01.java
      package com.yawu.xiaowen.header;
      
      import com.rabbitmq.client.*;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.io.IOException;
      import java.util.Map;
      
      /**
       * Header交换机
       * 消费者
       *
       * @author yawu
       * @date 2019.07.01
       */
      public class Consumer01 {
          private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
          private static final String EXCHANGE_NAME = "mq_header_exchange";
      
          public static void execute(Map<String, Object> myHeaderMap) {
              try {
                  ConnectionFactory factory = new ConnectionFactory();
                  factory.setHost("127.0.0.1");
                  Connection connection = factory.newConnection();
                  Channel channel = connection.createChannel();
      
                  //声明一个Headers类型的交换机
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
      
                  // 声明一个临时队列
                  String queue_name = channel.queueDeclare().getQueue();
      
                  // 队列绑定时需要指定参数,注意虽然不需要路由键但仍旧不能写成null,需要写成空字符串""
                  channel.queueBind(queue_name, EXCHANGE_NAME, "", myHeaderMap);
      
                  LOGGER.info("【Consumer01:" + myHeaderMap + "】 等待消息...");
                  Consumer consumer = new DefaultConsumer(channel) {
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          final String message = new String(body, "UTF-8");
                          LOGGER.info("【Consumer01:" + myHeaderMap + "】接收到的消息 '" + properties.getHeaders() + "':'" + message + "'");
                      }
                  };
      
                  // 队列一确认消息
                  channel.basicConsume(queue_name, true, consumer);
      
              } catch (Exception e) {
                  LOGGER.error("an exception was occurred , caused by :{}", e.getMessage());
              }
          }
      }
      
      
    测试工具
    • HearTest.java

      package com.yawu.xiaowen.header;
      
      
      import org.junit.Test;
      
      import java.util.HashMap;
      import java.util.Map;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      
      public class HeaderTest {
      
          private ExecutorService executorService = Executors.newFixedThreadPool(10);
      
          @Test
          public void header() throws InterruptedException {
      
              // 消费者1:绑定 health=Nice,mentality=Good
              executorService.submit(() -> {
                  Map<String, Object> headerMap = new HashMap<>();
                  headerMap.put("health", "Nice");
                  headerMap.put("mentality", "Good");
                  headerMap.put("x-match", "all");
                  Producer.execute(headerMap);
              });
      
              // 消费者2:绑定  health=Nice,mentality=Bad
              executorService.submit(() -> {
                  Map<String, Object> headerMap = new HashMap<>();
                  headerMap.put("health", "Nice");
                  headerMap.put("mentality", "Bad");
                  headerMap.put("x-match", "any");
                  Producer.execute(headerMap);
              });
      
              // 消费者3:绑定  health=Terrible,mentality=Good
              executorService.submit(() -> {
                  Map<String, Object> headerMap = new HashMap<>();
                  headerMap.put("health", "Terrible");
                  headerMap.put("mentality", "Good");
                  headerMap.put("x-match", "all");
      //            headerMap.put("x-match","any");
                  Producer.execute(headerMap);
              });
      
              Thread.sleep(2 * 1000);
              System.out.println("=============消息01===================");
              // 生产者1 : health=Nice,mentality=Good,x-match=all
              executorService.submit(() -> {
                  Map<String, Object> headerMap = new HashMap<>();
                  headerMap.put("health", "Nice");
                  headerMap.put("mentality", "Good");
      //            headerMap.put("x-match","all");
                  Consumer01.execute(headerMap);
              });
      
              Thread.sleep(5 * 100);
              System.out.println("=============消息02===================");
              // 生产者2 : health=Nice,x-match=any
              executorService.submit(() -> {
                  Map<String, Object> headerMap = new HashMap<>();
                  headerMap.put("health", "Nice");
      //            headerMap.put("x-match","any");
                  Consumer01.execute(headerMap);
              });
      
              Thread.sleep(5 * 100);
              System.out.println("=============消息03===================");
              // 生产者1 : health=Terrible,mentality=Bad,x-match=all
              executorService.submit(() -> {
                  Map<String, Object> headerMap = new HashMap<>();
                  headerMap.put("health", "Terrible");
                  headerMap.put("mentality", "Bad");
      //            headerMap.put("x-match","all");
                  Consumer01.execute(headerMap);
              });
      
              // sleep 10s
              Thread.sleep(10 * 1000);
          }
      }
      
    • 运行HeaderTest测试工具,结果如图:


      Headers模式测试结果.png
    • 至此,Headers交换机模式学习完毕

    相关文章

      网友评论

        本文标题:Rabbitmq打怪升级之路(十二)Headers-头交换机模式

        本文链接:https://www.haomeiwen.com/subject/mglbcctx.html