美文网首页技术篇
RabbitMQ交换机详解

RabbitMQ交换机详解

作者: 若兮缘 | 来源:发表于2019-03-12 08:43 被阅读18次

    交换机概念

    Exchange:交换机,接收消息,并根据路由键转发消息到绑定的队列
    如图为官网提供的模型,蓝色框表示Send Message,Client端把消息投递到Exchange上,通过RoutingKey路由关系将消息路由到指定的队列,绿色框代表Receive Message,Client端和队列建立监听,然后去接收消息。红色框代表RabbitMQ Server,黄色框表示RoutingKey,即Exchange和Queue需要建立绑定关系。

    交换机属性

    • Name: 交换机名称
    • Type: 交换机类型,direct、topic、 fanout、 headers
    • Durability: 是否需要持久化
    • Auto Delete: 当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange
    • Internal: 当前Exchange是否用于RabbitMQ内部使用,默认为False
    • Arguments: 扩展参数,用于扩展AMQP协议定制化使用

    Direct Exchange

    直连方式,所有发送到 Direct Exchange 的消息被转发到RouteKey中指定的Queue
    注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。

    看一下 Direct Exchange 的图解,其实意思就是说指定了RoutingKey的消息会被投递到绑定关系与该key值相同的队列上。

    生产端

    指定投递的Exchange和相应的RontingKey进行发送消息

    public class Producer4DirectExchange {
    
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.43.157");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            
            //3 创建Channel
            Channel channel = connection.createChannel();  
            
            //4 声明
            String exchangeName = "test_direct_exchange";
            String routingKey = "test.direct";
            
            //5 发送
            String msg = "Hello World RabbitMQ 4  Direct Exchange Message ... ";
            channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); 
            
            //6 关闭连接
            channel.close();
            connection.close();
        }   
    }
    
    消费端
    • 声明一个直连交换机
    • 声明队列
    • 建立交换机和队列的绑定关系
    • 消费者监听队列,消费消息
    public class Consumer4DirectExchange {
    
        public static void main(String[] args) throws Exception {
            
            //1 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory() ;  
            connectionFactory.setHost("192.168.43.157");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            // 1.设置是否自动重连(网络闪断时)      2.每3秒重连一次
            connectionFactory.setAutomaticRecoveryEnabled(true);
            connectionFactory.setNetworkRecoveryInterval(3000);
            //2 创建Connection
            Connection connection = connectionFactory.newConnection();
            //3 创建Channel
            Channel channel = connection.createChannel();  
            //4 声明
            String exchangeName = "test_direct_exchange";
            String exchangeType = "direct";
            String queueName = "test_direct_queue";
            String routingKey = "test.direct";
            
            //表示声明了一个交换机
            //参数分别为:name,type,durable,autoDelete,internal,arguments
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            //表示声明了一个队列
            //参数分别为:name,durable,exclusive,autoDelete,arguments
            channel.queueDeclare(queueName, false, false, false, null);
            //建立一个绑定关系(exchange和queue)
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //细节一:同一个队列可以绑定多个值
            channel.queueBind(queueName, exchangeName, "666");
            //细节二:不同队列可以绑定相同的RoutingKey
            String queueName2 = "test_direct_queue2";
            channel.queueDeclare(queueName2, false, false, false, null);
            channel.queueBind(queueName2, exchangeName, routingKey);
            
            //创建消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);
            //监听多个队列
            channel.basicConsume(queueName2, true, consumer);
            // 循环获取消息  
            while(true){  
                //获取消息,如果没有消息,这一步将会一直阻塞  
                Delivery delivery = consumer.nextDelivery();  
                String msg = new String(delivery.getBody());    
                System.out.println("收到消息:" + msg);  
            } 
        }
    }
    
    运行说明

    先启动消费端,刷新管控台,在Exchange目录下可以看到我们声明的exchange以及type

    点击该exchange可以看到和队列的绑定关系

    然后启动生产端,此时消费端控制台进行了打印,共消费了两条消息,说明监听的两个队列都接收到了消息。

    收到消息:Hello World RabbitMQ 4  Direct Exchange Message ... 
    收到消息:Hello World RabbitMQ 4  Direct Exchange Message ... 
    

    当我们修改生产端的routingKey值为:666时,那么只有队列一接收到消息并被消费者消费。
    如果修改值为:test.direct111,此时在启动生产端,消费端就收不到消息了,这就是直连的方式。

    Topic Exchange

    所有发送到 Topic Exchange 的消息被转发到所有关心RouteKey中指定Topic的Queue上
    Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

    注意:可以使用通配符进行模糊匹配

    图解

    看一下 Topic Exchange 的图解,意思是说我们有4个队列,它们的绑定关系分别是usa.##.news#.weathereurope.#
    对于第一个队列而言,它只关系以usa.开头的相关消息。比如发送的第一条消息是usa.news,那么这条消息会同时匹配上队列1和队列2,所以两个队列都能接收到,其他消息也是同样的规则,这里就不继续展开说了。

    生产端

    指定投递的Exchange和相应的RontingKey进行发送消息

            //省略创建连接和通道...
            //4 声明
            String exchangeName = "test_topic_exchange";
            String routingKey1 = "user.save";
            String routingKey2 = "user.update";
            String routingKey3 = "user.delete.abc";
            
            //5 发送
            String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
            channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes()); 
            channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());    
            channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
            //关闭连接...
    
    消费端

    声明一个Topic Exchange,声明队列,建立交换机和队列的绑定关系

             //省略创建连接和通道...     
            //4 声明
            String exchangeName = "test_topic_exchange";
            String exchangeType = "topic";
            String queueName = "test_topic_queue";
            String routingKey = "user.#";
            // 声明Topic Exchange
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, null);
            // 建立交换机和队列的绑定关系
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //创建消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);  
            
            //循环获取消息... 
    
    运行说明

    先启动消费端,同样可以在管控台可以看到我们新声明的exchange以及它的绑定队列,这里不再细说。然后启动生产端,此时消费端控制台进行了打印,共消费了三条消息,说明三条消息都和队列指定的Topic匹配上了,因为使用的是 # 匹配

    收到消息:Hello World RabbitMQ 4 Topic Exchange Message ...
    收到消息:Hello World RabbitMQ 4 Topic Exchange Message ...
    收到消息:Hello World RabbitMQ 4 Topic Exchange Message ...
    

    将消费端指定的RoutingKey进行修改:routingKey = "user.*";
    然后重新启动消费端,注意此时该队列绑定了两个RoutingKey,那么生产者无论匹配到哪个都可以将消息投递到该队列中。我们在管控台将原来的路由规则进行解绑,如图所示,点击Unbind按钮。

    再次启动生产端,此时消费端控制台打印了两条消息,说明最后一条消息:user.delete.abc没有匹配上,因为使用的是 * 匹配,这就是 Topic Exchange 的路由方式。

    Fanout Exchange

    • 不处理路由键,只需要简单的将队列绑定到交换机上
    • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
    • Fanout交换机转发消息是最快的(性能最好)

    来看一下 Fanout Exchange 的图解,意思就是消息不走任何的路由规则,只有队列和交换机有绑定关系就能收到消息

    生产端

    不设置路由键直接发送消息到Fanout Exchange

            //省略创建连接和通道...
            //4 声明
            String exchangeName = "test_fanout_exchange";
            //5 发送
            for(int i = 0; i < 5; i ++) {
                String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
                //不设置路由键或者设置任意内容均可
                channel.basicPublish(exchangeName, "", null , msg.getBytes());          
            }
            //关闭连接...
    
    消费端

    声明一个Fanout Exchange,声明队列,建立交换机和队列的绑定关系,绑定关系不使用RoutingKey

            //省略创建连接和通道...      
            //4 声明
            String exchangeName = "test_fanout_exchange";
            String exchangeType = "fanout";
            String queueName = "test_fanout_queue";
            String routingKey = ""; //不设置路由键
            //声明交换机、队列、绑定关系
            channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
            channel.queueDeclare(queueName, false, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            //创建消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //参数:队列名称、是否自动ACK、Consumer
            channel.basicConsume(queueName, true, consumer);
            //循环获取消息... 
    

    先启动消费端,然后启动生产端,消息被成功消费,这种就是Fanout Exchange,它不走任何的路由规则,直接将消息路由到所有与它绑定的队列。

    收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...
    收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...
    收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...
    收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...
    收到消息:Hello World RabbitMQ 4 FANOUT Exchange Message ...
    

    还有一种交换机Headers Exchange,很少使用,是通过消息头进行路由的,通常我们都使用前三种。

    相关文章

      网友评论

        本文标题:RabbitMQ交换机详解

        本文链接:https://www.haomeiwen.com/subject/bqqipqtx.html