RabbitMQ:路由模式Routing

作者: AubreyXue | 来源:发表于2018-06-30 20:13 被阅读55次
    9824247-2f04485338ca6443.jpg

    上一篇中我们构建了一个简单的日志系统,我们可以把日志消息广播给多个接受者。

    这篇中我们将来添加一个特性只接收部分消息。例如我只将一些错误log存到文件中,把所有的log都打印到控制台里。


    WX20180630-192459@2x.png

    1、绑定(Bindings)

    在上篇博文中,我们已经创建了一个binding,代码如下:

    channel.queueBind(queueName, EXCHANGE_NAME, "");
    

    一个binding就是exchange和Queue之间的一个关系。可以简单的理解为:这个Queue对其相对于的exchange的消息之间建立了一个关系。

    Binding可以使用一个已经存在的routingKey参数。为了避免和basic_publish参数混淆,我们称之为binding key。下边就是我们怎么用key来创建一个binding:

    channel.queueBind(queueName, EXCHANGE_NAME, "black");
    

    binding key的意义有时候取决于exchange的类型。对于Fanout类型的exchange,会忽略binding key。

    2、Direct类型的exchange

    我们上篇博文中的日志系统会把所有的log消息广播给所有的消费者。我们想扩展来根据他们的日志级别来过滤log消息。例如:我们只想把error级别的日志写到磁盘文件中,而其它级别的日志消息则过滤掉。

    我们之前使用的fanout类型的exchange,但这样就不会有太多的灵活性。

    在这里我们将要使用direct类型的exchange。Direct类型exchange的路由算法是很简单的:要想一个消息能到达这个队列,需要binding key和routing key正好能匹配得上。

    WX20180630-192459@2x.png

    在这样的结构中,我们可以看到direct类型的exchange X,有两个queue绑定到它。第一个queue是以orange为binding key绑定到exchange X上的,第二个queue是由两个binding key(black和green)绑定到exchange X的。

    在这样的设置中,一条消息被推送到exchange,如果使用的routing key是error,那么消息就会被路由到C1中;如果使用的routing key是error或者info或者warning,那么该消息将会被路由到C2中。其它的消息都将会被丢弃掉。

    3、多重绑定(Multiple bindings)

    221512183874891.png

    用同一个binding来把多个queue绑定到同一个exchange也是可行的。例如在之前例子的基础上,在X和Q1之间添加binding key名字为black,这样的话,这里的direct类型的exchange就和fanout类型的一样了,可以把消息推送给所有的queue。带有routing key为black的消息将会被推送到Q1和Q2中。

    4、发送日志(Emitting logs)

    我们将会使用这种模型,不使用fanout类型的exchange,而是使用direct类型的。我们使用日志级别做为routing key,接收端根据设置的日志级别做为binding key来接收消息。首先来看看发射日志:

    如之前一样,首先来创建一个exchange:

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    

    然后准备发送消息;

    channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
    

    这里的”severity”可以是”info”、“warning”、”error”等。

    那么下面我们用代码实现以下:

    5、生产者

    package com.hrabbit.rabbitmq.routing.send;
    
    import com.hrabbit.rabbitmq.utils.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Auther: hrabbit
     * @Date: 2018-06-30 下午7:41
     * @Description:
     */
    public class Send {
    
        //交换机名称
        private final static String EXCHANGE_NAME = "hrabbit_exchange_direct";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            //声明一个交换机,一个参数为交换机名称,第二个参数为模式
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            // 消息内容
            String message = "id=1的商品删除了";
    
            channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    
    

    在上面的生产者我发送了一个info类型的内容,此时应该C2可以接受到这条消息。

    6、消费者1号

    消费者定义的routingKey 为error

    package com.hrabbit.rabbitmq.routing.recover;
    
    import com.hrabbit.rabbitmq.utils.ConnectionUtils;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Auther: hrabbit
     * @Date: 2018-06-30 下午7:42
     * @Description:
     */
    public class Recover {
    
        //队列名称
        private final static String QUEUE_NAME = "hrabbit_queue_direct_1";
        //交换机名称
        private final static String EXCHANGE_NAME = "hrabbit_exchange_direct";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
            //------------下面逻辑和work模式一样-----
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 消息到达 触发这个方法
                    String msg = new String(body, "utf-8");
                    System.out.println("[error]:" + msg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("error消息执行完毕!");
                        // 手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    
        }
    }
    
    

    6、消费者2号

    消费者定义的routingKey 为errorinfowarning

    package com.hrabbit.rabbitmq.routing.recover;
    
    import com.hrabbit.rabbitmq.utils.ConnectionUtils;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Auther: hrabbit
     * @Date: 2018-06-30 下午7:42
     * @Description:
     */
    public class Recover2 {
    
        //队列名称
        private final static String QUEUE_NAME = "hrabbit_queue_direct_2";
        //交换机名称
        private final static String EXCHANGE_NAME = "hrabbit_exchange_direct";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取到连接以及mq通道
            Connection connection = ConnectionUtils.getConnection();
            final Channel channel = connection.createChannel();
    
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
            //------------下面逻辑和work模式一样-----
            // 同一时刻服务器只会发一条消息给消费者
            channel.basicQos(1);
    
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 消息到达 触发这个方法
                    String msg = new String(body, "utf-8");
                    System.out.println("[info]:" + msg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("info消息执行完毕!");
                        // 手动回执
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
    
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    
        }
    }
    
    

    7.测试结果

    在消费者2中输出了类型info的消息

    image.png

    总结:
    要记住生产者端的routing key,那么在消费者端设置binding key和之前的routing key一样,就可以用direct类型的exchange了,以此来获取到自己需要的消息。

    系列文章:

    RabbitMQ:RabbitMQ-理论基础
    RabbitMQ:快速入门hello word
    RabbitMQ:RabbitMQ:work queues 工作队列(Round-robin/Fair dispatch)
    RabbitMQ:RabbitMQ:消息应答与消息持久化
    RabbitMQ:发布/订阅 Publish/Subscribe
    RabbitMQ:Topic类型的exchange
    RabbitMQ:RabbitMQ之消息确认机制(事务+Confirm)
    RabbitMQ:spring整合RabbitMQ

    相关文章

      网友评论

        本文标题:RabbitMQ:路由模式Routing

        本文链接:https://www.haomeiwen.com/subject/axhzyftx.html