美文网首页
交换器类型 ~ 消息中间件之六

交换器类型 ~ 消息中间件之六

作者: 喏喏2021 | 来源:发表于2022-01-30 23:01 被阅读0次

1. 主要类型

交换器主要有4种类型:fanout、direct、topic、headers
下面我们挨个来看每一个类型

2. fanout

把所有发送该路由的消息,都路由到所有绑定的队列中。
下面我们模拟了3个队列

public class FanoutTest {
       //定义一些常量
   private static final String EXCHANGE_NAME = "exchange_basic_fanout"; //这里是发送消息的路由
   private static final String ROUTING_KEY_1 = "routingkey_basic_1"; //分别设置了3个不同的routingkey
   private static final String ROUTING_KEY_2 = "routingkey_basic_2";
   private static final String ROUTING_KEY_3 = "routingkey_basic_3";
   private static final String QUEUE_NAME_1 = "queue_basic_1";//这里我们将创建3个不同的队列
   private static final String QUEUE_NAME_2 = "queue_basic_2";
   private static final String QUEUE_NAME_3 = "queue_basic_3"
   private static final int PORT = 5672;

       //发送消息
   public static void send() throws Exception {
               //创建连接工厂
       ConnectionFactory factory = new ConnectionFactory();
               //设置工厂相关属性
       factory.setHost("localhost");
       factory.setPort(PORT);
       factory.setUsername("root");
       factory.setPassword("root");
       // 创建连接
       Connection conn = factory.newConnection();
       //创建3个不同的队列,使用不同的routing_key来绑定,
       createChannel(conn,QUEUE_NAME_1,ROUTING_KEY_1);
       createChannel(conn,QUEUE_NAME_2,ROUTING_KEY_2);
       createChannel(conn,QUEUE_NAME_3,ROUTING_KEY_3);
       //连接关闭
       conn.close();

   }
   //通过参数创建channel
   public static void createChannel(Connection conn, String queueName,String routingKey) throws Exception {
       // 创建信道,一个连接上可以创建多个通过
       Channel channel1 = conn.createChannel();
       // 创建一个fanout类型,持久化、非自动删除的交换器
       channel1.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);
       // 创建一个持久化、非排他的、非自动删除的队列
       channel1.queueDeclare(queueName, true, false, false, null);
       // 将交换器与队列通过路由器绑定
       channel1.queueBind(queueName, EXCHANGE_NAME, routingKey);
       // 发送一条消息
       String message2 = "你好!";
       channel1.basicPublish(EXCHANGE_NAME, routingKey+"_4", MessageProperties.PERSISTENT_TEXT_PLAIN,
               message2.getBytes());
       // 通道关闭
       channel1.close();
   }
       //通过队列名,接收消息
   public static void receive(String queueName) throws IOException, TimeoutException, InterruptedException {
              //接收的方式稍有不同,这里我们可以指定多个地址,每个地址主要包括IP,和端口号
       Address[] addresses = new Address[] { new Address("localhost", PORT) };
       ConnectionFactory factory = new ConnectionFactory();
               //这里只要指定连接的用户名和密码即可
       factory.setUsername("root");
       factory.setPassword("root");
       Connection conn = factory.newConnection(addresses);
               //这里创建一个接收消息的通道
       final Channel channel = conn.createChannel();
               //这里我们实现了一个匿名的接口
       Consumer consumer = new DefaultConsumer(channel) {

           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                   throws IOException {
                               //这里我们打印一下接收消息的内容
               System.out.println("recv,收到消息:" + new String(body) + ",consumerTag:" + consumerTag);
                               //对通道应答,表示已成功接收
               channel.basicAck(envelope.getDeliveryTag(), false);
           }

       };
               //启动消息接收过程
       channel.basicConsume(queueName, consumer);
       TimeUnit.SECONDS.sleep(5);
       channel.close(); //通道关闭
       conn.close(); //连接关闭
   }

   public static void main(String[] args) throws Exception {
              //创建了3个队列,并分别发送了消息
       send();
               //3个队列,分别接收消息
       receive(QUEUE_NAME_1);
       receive(QUEUE_NAME_2);
       receive(QUEUE_NAME_3);
   }
}

下面是我们最终得到的输出结果
//recv,收到消息:你好!,consumerTag:amq.ctag-e3sO2vQPearf8fD5TKKXgQ
//recv,收到消息:你好!,consumerTag:amq.ctag-e3sO2vQPearf8fD5TKKXgQ
//recv,收到消息:你好!,consumerTag:amq.ctag-e3sO2vQPearf8fD5TKKXgQ
//recv,收到消息:你好!,consumerTag:amq.ctag-PSzOOVxdI0jMFJZxdR6LHA
//recv,收到消息:你好!,consumerTag:amq.ctag-PSzOOVxdI0jMFJZxdR6LHA
//recv,收到消息:你好!,consumerTag:amq.ctag-PSzOOVxdI0jMFJZxdR6LHA
//recv,收到消息:你好!,consumerTag:amq.ctag-R3fJrsvLuq6SAIF0u3XB3w
//recv,收到消息:你好!,consumerTag:amq.ctag-R3fJrsvLuq6SAIF0u3XB3w
//recv,收到消息:你好!,consumerTag:amq.ctag-R3fJrsvLuq6SAIF0u3XB3w

通过结果我们可以看出,每个队列都收到了3个消息,也就是完全忽略了routing_key信息,
使用时,我们要格外关注一下交换器的类型,不能只关注它的routing_key信息,不然可能会有重复接收消息的情况发生
未完待...

相关文章

网友评论

      本文标题:交换器类型 ~ 消息中间件之六

      本文链接:https://www.haomeiwen.com/subject/ruxahrtx.html