美文网首页Java 杂谈
RabbitMQ:Topic类型的exchange

RabbitMQ:Topic类型的exchange

作者: AubreyXue | 来源:发表于2018-07-02 14:24 被阅读115次
    9824247-87bc1d746154471c.jpeg

    尽管使用了direct类型的exchange对日志系统有所提升,但还是有一些限制(消息不能够基于多重因素来路由)。

    在我们的日志系统中,希望不仅仅能够根据日志级别来订阅,还可以根据指定的routing key来订阅。你应该可以理解的,就如unix的系统日志工具,日志消息路由规则不仅仅基于日志级别(info/warn/crit…),还可以基于设备(auth/cron/kern...)。

    这样大大的提高的灵活性,例如我们可以只监听kern推送出来的error级别的日志。

    为了在我们的日志记录系统中实现这样的功能,我们需要了解更多关于topic类型的exchange。

    1、Topic类型的exchange

    消息发送到topic类型的exchange上时不能随意指定routing_key(一定是指由一系列由点号连接单词的字符串,单词可以是任意的,但一般都会与消息或多或少的有些关联)。Routing key的长度不能超过255个字节。

    Binding key也一定要是同样的方式。Topic类型的exchange就像一个直接的交换:一个由生产者指定了确定routing key的消息将会被推送给所有Binding key能与之匹配的消费者。然而这种绑定有两种特殊的情况:

    • *(星号):可以(只能)匹配一个单词
    • #(井号):可以匹配多个单词(或者零个)
    image.png

    在这个例子中,我们将会发送一些描述动物的消息。Routing key的第一个单词是描述速度的,第二个单词是描述颜色的,第三个是描述物种的:“<speed>.<colour>.<species>”。

    这里我们创建三个Binding:Binding key为”.orange.”的Q1,和binding key为”..rabbit”和”lazy.#”的Q2。

    这些binding可以总结为:

    Q1对所有橘色的(orange)的动物感兴趣;
    Q2希望能拿到所有兔子的(rabbit)信息,还有比较懒惰的(lazy.#)动物信息。
    一条以” quick.orange.rabbit”为routing key的消息将会推送到Q1和Q2两个queue上,routing key为“lazy.orange.elephant”的消息同样会被推送到Q1和Q2上。但如果routing key为”quick.orange.fox”的话,消息只会被推送到Q1上;routing key为”lazy.brown.fox”的消息会被推送到Q2上,routing key为"lazy.pink.rabbit”的消息也会被推送到Q2上,但同一条消息只会被推送到Q2上一次。

    如果在发送消息时所指定的exchange和routing key在消费者端没有对应的exchange和binding key与之绑定的话,那么这条消息将会被丢弃掉。例如:"orange"和"quick.orange.male.rabbit"。但是routing为”lazy.orange.male.rabbit”的消息,将会被推到Q2上。

    Topic类型的exchange是很强大的,也可以实现其它类型的exchange。

    • 当一个队列被绑定为binding key为”#”时,它将会接收所有的消息,此时和fanout类型的exchange很像。
    • 当binding key不包含”*”和”#”时,这时候就很像direct类型的exchange。
      注:以上内容摘自网上其他大佬的讲解,这些东西一看就懂,类似于模糊查询;

    2、生产者

    在这里我设置了几个不同类型的routingKey

    package com.hrabbit.rabbitmq.toptic.send;
    
    import com.hrabbit.rabbitmq.utils.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Auther: hrabbit
     * @Date: 2018-06-30 下午7:41
     * @Description:
     */
    public class Send {
    
        //交换机名称
        private final static String EXCHANGE_NAME = "hrabbit_exchange_topic";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //声明一个交换机,一个参数为交换机名称,第二个参数为模式
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            //待发送的消息
            String[] routingKeys=new String[]{
                    "quick.orange.rabbit",
                    "lazy.orange.elephant",
                    "quick.orange.fox",
                    "lazy.brown.fox",
                    "quick.brown.fox",
                    "quick.orange.male.rabbit",
                    "lazy.orange.male.rabbit"
            };
            //发送消息
            for(String severity :routingKeys){
                String message = "From "+severity+" routingKey' s message!";
                channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
                System.out.println("Send: '" + severity + "':'" + message + "'");
            }
            channel.close();
            connection.close();
        }
    }
    

    3、消费者1

    此类型数据的规则匹配的是以orange为中间的,头与尾为任意类型的数据
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");

    package com.hrabbit.rabbitmq.toptic.recover;
    
    import com.hrabbit.rabbitmq.utils.ConnectionUtils;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Auther: hrabbit
     * @Date: 2018-06-30 下午7:42
     * @Description:
     */
    public class Recover {
    
        //队列名称
        private final static String QUEUE_NAME = "hrabbit_queue_topic_1";
        //交换机名称
        private final static String EXCHANGE_NAME = "hrabbit_exchange_topic";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
            //------------下面逻辑和work模式一样-----
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 消息到达 触发这个方法
                    String msg = new String(body, "utf-8");
                    System.out.println("[*消息]:" + msg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("error消息执行完毕.!");
                        // 手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    
        }
    }
    

    3、消费者2

    配置的规则是以rabbit结尾,前面有任意两种格式的数据
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
    匹配的规则是以lazy开始的任意规则的数据
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");

    package com.hrabbit.rabbitmq.toptic.recover;
    
    import com.hrabbit.rabbitmq.utils.ConnectionUtils;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Auther: hrabbit
     * @Date: 2018-06-30 下午7:42
     * @Description:
     */
    public class Recover2 {
    
        //队列名称
        private final static String QUEUE_NAME = "hrabbit_queue_topic_2";
        //交换机名称
        private final static String EXCHANGE_NAME = "hrabbit_exchange_topic";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
            //------------下面逻辑和work模式一样-----
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 消息到达 触发这个方法
                    String msg = new String(body, "utf-8");
                    System.out.println("[info]:" + msg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("info消息执行完毕.!");
                        // 手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
        }
    }
    

    4、测试结果

    消费者1结果:


    image.png

    消费者1输出的数据我们可以发现,都是以orange为中间,任意数据开头和结尾的数据。
    消费者2结果:


    image.png
    消费者2号输出的数据我们可以发现,配置的规则是以rabbit结尾,前面有任意两种格式的数据或者匹配的规则是以lazy开始的任意规则的数据。

    因此:

    • *(星号):可以(只能)匹配一个单词
    • #(井号):可以匹配多个单词(或者零个)

    系列文章:

    RabbitMQ:RabbitMQ-理论基础
    RabbitMQ:RabbitMQ:快速入门hello word
    RabbitMQ:RabbitMQ:work queues 工作队列(Round-robin/Fair dispatch)
    RabbitMQ:RabbitMQ:消息应答与消息持久化
    RabbitMQ:发布/订阅 Publish/Subscribe
    RabbitMQ:路由Routing
    RabbitMQ:RabbitMQ之消息确认机制(事务+Confirm)
    RabbitMQ:spring整合RabbitMQ

    相关文章

      网友评论

        本文标题:RabbitMQ:Topic类型的exchange

        本文链接:https://www.haomeiwen.com/subject/zzgkuftx.html