美文网首页
Exchange交换机

Exchange交换机

作者: 快点给我想个名 | 来源:发表于2019-06-26 22:36 被阅读0次
    exchange
    exchange

    客户端将消息发送到exchange,exchange根据指定的routing key在路由到指定的队列。

    交换机属性
    • name:交换机名称
    • type:交换机类型direct、topic、fanout、headers
    • durability:是否需要持久化,ture为持久化
    • auto delete:当最后一个绑定到exchange上的队列删除后,自动删除exchange
    Direct Exchange

    所有发送到Direct Exchange的消息被转发到routing key中指定的Queue
    Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定操作,消息传递时,routing key必须完全匹配才会被队列接收,否则该消息会被抛弃。


    Direct Exchange.png

    routing key为KEY则queue的名字要和KEY一致。消息才会被发送到该队列

    • Producer
    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.6.99");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            Connection connection = factory.newConnection();
    
            Channel channel = connection.createChannel();
    
            /**
             * 不指定exchange,默认会将消息发送到与routingkey对应名称的queue中
             */
            channel.basicPublish("exchangeName","routingkey",null,"hello world".getBytes());
    
            channel.close();
            connection.close();
        }
    }
    
    
    • Consumer
    public class Consumer {
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.6.99");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            Connection connection = factory.newConnection();
    
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare("exchangeName","direct",true,false,false,null);
    
            channel.queueDeclare("queueName", true, false, false, null);
    
            channel.queueBind("queueName","exchangeName","routingkey");
    
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    
            channel.basicConsume("queueName",true,queueingConsumer);
    
            while (true){
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" Received '" + message + "'");
            }
        }
    }
    
    Topic Exchange

    所有发送到Topic Exchange的消息被转发到所有关心routing key中指定Topic的queue上。
    Exchange将routingkey和某Topic进行模糊匹配,此时队列需要绑定一个Topic。


    Topic Exchange.png
    • Producer
    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.6.99");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            Connection connection = factory.newConnection();
    
            Channel channel = connection.createChannel();
            
            channel.basicPublish("exchangeNameTopic","routingkey.123",null,"hello world".getBytes());
    
            channel.close();
            connection.close();
        }
    }
    
    • Consumer
    public class Consumer {
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.6.99");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            Connection connection = factory.newConnection();
    
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare("exchangeNameTopic","topic",true,false,false,null);
    
            channel.queueDeclare("queueName", true, false, false, null);
    
            channel.queueBind("queueName","exchangeNameTopic","routingkey.#");
    
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    
            channel.basicConsume("queueName",true,queueingConsumer);
    
            while (true){
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" Received '" + message + "'");
            }
        }
    }
    

    * 可以匹配一个单词
    # 可以匹配零个或多个单词

    Fanout Exchange

    不处理路由键,只需要简单的将队列绑定到交换机上。
    发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
    Fanout交换机转发消息是最快的。


    Fanout Exchange.png
    • Producer
    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.6.99");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            Connection connection = factory.newConnection();
    
            Channel channel = connection.createChannel();
    
            channel.basicPublish("exchangeNameFanout","",null,"hello world".getBytes());
    
            channel.close();
            connection.close();
        }
    }
    
    • Consumer
    public class Consumer {
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.6.99");
            factory.setPort(5672);
            factory.setVirtualHost("/");
    
            Connection connection = factory.newConnection();
    
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare("exchangeNameFanout","fanout",true,false,false,null);
    
            channel.queueDeclare("queueName", true, false, false, null);
    
            channel.queueBind("queueName","exchangeNameFanout","");
    
            QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
    
            channel.basicConsume("queueName",true,queueingConsumer);
    
            while (true){
                QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" Received '" + message + "'");
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Exchange交换机

          本文链接:https://www.haomeiwen.com/subject/mdohcctx.html