RabbitMQ入门-Routing直连模式

作者: Jackie_Zheng | 来源:发表于2017-08-07 22:46 被阅读81次

    Hello World模式,告诉我们如何一对一发送和接收消息;

    Work模式,告诉我们如何多管齐下高效的消费消息;
    Publish/Subscribe模式,告诉我们如何广播消息
    那么有没有灵活强一点的既可以高效消费,又可以同时送达多个消费者的模式?
    有,这就是Routing模式,我又称之为Direct直连模式。

    Routing模式

    routing模式.png
    • 一个生产者P,一个交换机X,多个消息队列Q以及多个消费者C

    • 在Exchange和Queue中,我们看到了不同的规则,也就是Routing Key

    显然从图中的说明,我们就知道这是一个log日志根据级别派发消息的例子。熟悉Log日志系统的应该都知道,一般的log系统分为error、info、warn和debug等。从图中我们可以看出,将日志级别为error的定向的派发到第一个消息队列,将error、warn和info级别的日志派发到第一个消息队列。

    该模型首先实现了定向派发,而不再是订阅模式那种广播式的派发。同一条消息既可以派发给一个Queue,也可以同时派发给两个或者多个Queue,这就是该模式的灵活之处。下面来看看实例

    发送端

    /**
     * Created by jackie on 17/8/7.
     */
    public class EmitLogDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] argv) throws Exception {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.3.161");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            String severity = getSeverity(argv);
            String message = getMessage(argv);
    
            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    
            channel.close();
            connection.close();
        }
    
        private static String getSeverity(String[] strings){
            if (strings.length < 1)
                return "info";
            return strings[0];
        }
    
        private static String getMessage(String[] strings){
            if (strings.length < 2)
                return "Hello World!";
            return joinStrings(strings, " ", 1);
        }
    
        private static String joinStrings(String[] strings, String delimiter, int startIndex) {
            int length = strings.length;
            if (length == 0 ) return "";
            if (length < startIndex ) return "";
            StringBuilder words = new StringBuilder(strings[startIndex]);
            for (int i = startIndex + 1; i < length; i++) {
                words.append(delimiter).append(strings[i]);
            }
            return words.toString();
        }
    }
    
    • String severity = getSeverity(argv);通过程序参数赋值给Routing Key,作为发送消息的规则

    • String message = getMessage(argv);通过程序参数赋值作为消息实体发送到Queue

    在run configurations中配置argv

    image.png

    *第一个参数是要绑定key的名称,第二个参数是要发送的消息内容

    • 运行后,可以在RabbitMQ管理应用中看到exchange,但是此时没有绑定queue,所以即使发送消息也没有queue会存储或者消费。
    image.png

    接收端

    /**
     * Created by jackie on 17/8/7.
     */
    public class ReceiveLogsDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.3.161");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            String queueName = channel.queueDeclare().getQueue();
    
            if (argv.length < 1){
                System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
                System.exit(1);
            }
    
            for(String severity : argv){
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
            }
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }
    
    • channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);表示使用的exchange类型为Direct类型

    • 绑定的queue的名称也是通过program arguments指定的

    image.png

    这里两个参数info和error表示绑定了两个routing key,即如果发送routing key为info的消息该队列能接收到,如果发送routing key为error,该队列也能收到

    运行情况

    启动接收端代码,我们可以看到生成了Queue名称为amq.gen-ugjKo6t4y0PXPwoh3CeubA的队列,同时有routingKey=info和routingKey=error的绑定到了Exchange上。

    image.png

    这时候起送发送端给routingkey为info发送消息“hello world”,我们可以看到在接收端确实能够收到消息“hello world”,同理,这时候发送routingkey为error的消息,该队列同样能够接收到,因为队列同时绑定了两个routing key

    image.png

    这个就是Routing直连模式。
    如果您觉得阅读本文对您有帮助,请点一下“喜欢”按钮,您的“喜欢”将是我最大的写作动力!如果您想持续关注我的文章,请扫描二维码,关注JackieZheng的微信公众号,我会将我的文章推送给您,并和您一起分享我日常阅读过的优质文章。

    相关文章

      网友评论

        本文标题:RabbitMQ入门-Routing直连模式

        本文链接:https://www.haomeiwen.com/subject/rnlzlxtx.html