美文网首页
RabbitMQ(六)路由

RabbitMQ(六)路由

作者: 薛晨 | 来源:发表于2016-11-04 14:36 被阅读136次

    RabbitMQ官网中文版教程:

    http://rabbitmq.mr-ping.com/tutorials_with_python/[4]Routing.html

    上述教程示例为pathon版,Java版及相应解释如下:

    生产者

    package com.xc.rabbit.routing;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * Created by xc.
     */
    public class RoutingSendDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        // 路由关键字
        private static final String[] routingKeys = new String[] {"info", "warning", "error"};
    
        public static void main(String[] args) throws Exception {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbit");
            factory.setPassword("carrot");
            // 创建一个新的连接
            Connection connection = factory.newConnection();
            // 创建一个频道
            Channel channel = connection.createChannel();
            // 声明交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            // 发送消息
            for (String severity : routingKeys) {
                String message = "Send the message level : " + severity;
                channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
                System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
            }
            channel.close();
            connection.close();
        }
    }
    
    

    消费者1

    package com.xc.rabbitmq.routing;
    
    import com.rabbit.client.*;
    
    import java.io.IOException;
    
    /**
     * Created by xc.
     */
    public class ReceiveLogsDirect1 {
    
        // 交换器名称
        private static final String EXCHANGE_NAME = "direct_logs";
        // 路由关键字
        private static final String[] routingKey = new String[]{"info", "warning", "error"};
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbit");
            factory.setPassword("carrot");
            // 创建一个新的连接
            Connection connection = factory.newConnection();
            // 创建一个频道
            Channel channel = connection.createChannel();
            // 声明交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            // 获取匿名队列名称
            String queueName = channel.queueDeclare().getQueue();
            // 根据路由关键字进行多重绑定
            for (String severity : routingKey) {
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
                System.out.println("ReceiveLogsDirect1 exchange : " + EXCHANGE_NAME +
                        ", queue : " + queueName + ", BindRoutingKey : " + severity);
    
            }
            System.out.println("ReceiveLogsDirect1 [*] Waiting for messages. To exit press CTRL + C");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }
    
    

    消费者2

    package com.xc.rabbit.routing;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * Created by xc.
     */
    public class ReceiveLogsDirect2 {
    
        // 交换器名称
        private static final String EXCHANGE_NAME = "direct_logs";
        // 路由关键字
        private static final String[] routingKey = new String[]{"error"};
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("rabbit");
            factory.setPassword("carrot");
            // 创建一个新的连接
            Connection connection = factory.newConnection();
            // 创建一个频道
            Channel channel = connection.createChannel();
            // 声明交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            // 获取匿名队列名称
            String queueName = channel.queueDeclare().getQueue();
            // 根据路由关键字进行多重绑定
            for (String severity : routingKey) {
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
                System.out.println("ReceiveLogsDirect2 exchange : " + EXCHANGE_NAME +
                        ", queue : " + queueName + ", BindRoutingKey : " + severity);
    
            }
            System.out.println("ReceiveLogsDirect2 [*] Waiting for messages. To exit press CTRL + C");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }
    
    

    运行结果如下:

    由图可知,生产者发出的消息,根据不同的路由,发送到不同的队列,进而被不同的消费者接收。

    先跑消费者程序,在跑生产者程序。否则,生产者的消息到达交换器之后,如果没有队列连上交换器, 则消息被直接丢弃。

    注意:

    1. Bindings can take an extra routingKey parameter. To avoid the confusion with a basic_publish,parameter we're going to call it a binding key.
      binding key和routing key是一回事,为了避免概念重复,channel.queueBind时叫binding key, channel.basicPublish时叫routing key。

    2. The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.
      direct交换器的路由规则很简单,消息会路由到binding key与routing key相同的队列。

    相关文章

      网友评论

          本文标题:RabbitMQ(六)路由

          本文链接:https://www.haomeiwen.com/subject/fqjiuttx.html