美文网首页
【译】RabbitMQ教程五

【译】RabbitMQ教程五

作者: maxwellyue | 来源:发表于2017-07-19 19:46 被阅读1558次

    内容来自:RabbitMQ Tutorials Java版


    Topics

    在上一个教程中我们改进了我们的日志系统:使用direct路由器替代了fanout路由器,从而可以选择性地接收日志。

    尽管使用direct路由器给我们的日志系统带了了改进,但仍然有一些限制:不能基于多种标准进行路由。

    在我们的日志系统中,我们可能不仅需要根据日志的严重级别来接收日志,而且有时想基于日志来源进行路由。如果你知道syslog这个Unix工具,你可能了解这个概念,sysylog会基于日志严重级别(info/warn/crit...)和设备(auth/cron/kern...)进行日志分发。

    如果我们可以监听来自corn的错误日志,同时也监听kern的所有日志,那么我们的日志系统就会更加灵活。

    为了实现这个功能,我们需要了解一个复杂的路由器:topic路由器。


    主题路由器(Topic Exchange)

    发送到topic路由器的消息的路由键routing_key不能任意给定:它必须是一些单词的集合,中间用点号.分割。这些单词可以是任意的,但通常会体现出消息的特征。一些有效的路由键示例:stock.usd.nysenyse.vmwquick.orange.rabbit。这些路由键可以包含很多单词,但路由键总长度不能超过255个字节。

    绑定键binding key也必须是这种形式。topic路由器背后的逻辑与direct路由器类似:以特定路由键发送的消息将会发送到所有绑定键与之匹配的队列中。但绑定键有两种特殊的情况:
    ①*(星号)仅代表一个单词
    ②#(井号)代表任意个单词
    下图可以很好地解释这两个符号的含义:

    `topic`路由器中的星号和井号

    对于上图的例子,我们将会发送描述动物的消息。这些消息将会以由三个单词组成的路由键发送。路由键中的第一个单词描述了速度,第二个描述了颜色,第三个描述了物种:<speed>.<colour>.<species>

    我们创建了三个绑定,Q1的绑定键为*.orange.*,Q2的绑定键有两个,分别是*.*.rabbitlazy.#

    上述绑定关系可以描述为:
    ①Q1关注所有颜色为orange的动物。
    ②Q2关注所有的rabbit,以及所有的lazy的动物。

    如果一个消息的路由键是quick.orange.rabbit,那么Q1和Q2都可以接收到,路由键是lazy.orange.elephant的消息同样如此。但是,路由键是quick.orange.fox的消息只会到达Q1,路由键是lazy.brown.fox的消息只会到达Q2。注意,路由键为lazy.pink.rabbit的消息只会到达Q2一次,尽管它匹配了两个绑定键。路由键为quick.brown.fox的消息因为不和任意的绑定键匹配,所以将会被丢弃。

    假如我们不按常理出牌:发送一个路由键只有一个单词或者四个单词的消息,像orange或者quick.orange.male.rabbit,这样的话,这些消息因为不和任意绑定键匹配,都将会丢弃。但是,lazy.orange.male.rabbit消息因为和lazy.#匹配,所以会到达Q2,尽管它包含四个单词。

    Topic exchange
    Topic exchange非常强大,可以实现其他任意路由器的功能。
    </br>当一个队列以绑定键#绑定,它将会接收到所有的消息,而无视路由键(实际是绑定键#匹配了任意的路由键)。----这和fanout路由器一样了。
    </br>当*#这两个特殊的字符不出现在绑定键中,Topic exchange就会和direct exchange类似了。


    放在一块

    我们将会在我们的日志系统中使用主题路由器Topic exchange,并假设所有的日志消息以两个单词<facility>.<severity>为路由键。

    代码和上个教程几乎一样。

    生产者EmitLogTopic.java

    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    
    public class EmitLogTopic {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] argv) {
            Connection connection = null;
            Channel channel = null;
            try {
                //建立连接和通道
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
                connection = factory.newConnection();
                channel = connection.createChannel();
    
                //声明路由器和路由器类型
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    
                //定义路由键和消息
                String routingKey = "";
                String message = "msg.....";
    
                //发布消息
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Exception ignore) {
                    }
                }
            }
        }
    }
    

    消费者ReceiveLogsTopic.java

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceiveLogsTopic {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] argv) throws Exception {
            //建立连接和通道
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //声明路由器和路由器类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            String queueName = channel.queueDeclare().getQueue();
    
            //
            String bingingKeys[] = {""};
    
            for (String bindingKey : bingingKeys) {
                channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
            }
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            //监听消息
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }
    

    现在,可以动手实验了。
    开头提到的:日志严重级别info/warn/crit...和设备auth/cron/kern...

    消费者:
    String bingingKeys[] = {""}改为String bingingKeys[] = {"#"},启动第一个消费者;
    再改为String bingingKeys[] = {"kern.*"},启动第二个消费者;
    再改为String bingingKeys[] = {"*.critical"},启动第三个消费者;
    再改为String bingingKeys[] = {"kern.*", "*.critical"},启动第四个消费者。

    生产者,发送多个消息,如:
    路由键为kern.critical 的消息:A critical kernel error
    路由键为kern.info 的消息:A kernel info
    路由键为kern.warn 的消息:A kernel warning
    路由键为auth.critical 的消息:A critical auth error
    路由键为cron.warn 的消息:A cron waning
    路由键为cron.critical 的消息:A critical cron error

    试试最后的结果:第一个消费者将会接收到所有的消息,第二个消费者将会kern的所有严重级别的日志,第三个消费者将会接收到所有设备的critical消息,第四个消费者将会接收到kern设备的所有消息和所有
    critical消息。

    下个教程,我们将会学习如何让消息往返,以此来作为一个远程过程调用(RPC)。


    说明

    ①与原文略有出入,如有疑问,请参阅原文
    ②原文均是编译后通过javacp命令直接运行程序,我是在IDE中进行的,相应的操作做了修改。

    相关文章

      网友评论

          本文标题:【译】RabbitMQ教程五

          本文链接:https://www.haomeiwen.com/subject/odckkxtx.html