rabbitmq学习

作者: wangxiaoda | 来源:发表于2017-02-19 17:07 被阅读1189次

    RabbitMQ笔记

    本文参考资料:http://blog.csdn.net/chwshuang/article/details/50521708
    学习并添加自己的理解记录的笔记
    官网demo很详细,地址:http://www.rabbitmq.com/getstarted.html


    1.RabbitMQ简介

    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

    ConnectionFactory、Connection、Channel

    ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。

    Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。

    ConnectionFactory为Connection的制造工厂。

    Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

    2.Hello World!

    mac安装rabbitmq

    brew update
    brew install rabbitmq
    

    耐心等待,安装完成后需要将/usr/local/sbin添加到$PATH,可以将下面这两行加到~/.bash_profile:

    # RabbitMQ Config
    export PATH=$PATH:/usr/local/sbin
    

    编辑完后:wq保存退出,使环境变量立即生效。

    source ~/.bash_profile
    
    

    启动rabbitmq

    rabbitmq-server
    

    登录Web管理界面

    浏览器输入localhost:15672,账号密码全输入guest即可登录。

    Paste_Image.png

    登录后管理页面

    Paste_Image.png

    消息生产者

    /**
     * TODO
     * 
     */
    package com.aitongyi.rabbitmq.helloworld;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    /**
     * 消息生产者
     *
     */
    public class P {
    
      private final static String QUEUE_NAME = "hello";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // queue 的定义具有幂等性(一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相等)
        // 因此定义的queue已经存在,不会重复定义,且不能修改。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        //  第一个参数就是交换器的名称。如果输入“”空字符串,表示使用默认的匿名交换器。
        //  第二个参数是【routingKey】路由线索
        //  匿名交换器规则:
        //  发送到routingKey名称对应的队列。
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println("P [x] Sent '" + message + "'");
    
        channel.close();
        connection.close();
      }
    }
    
    

    消息消费者

    /**
     * TODO
     * 
     */
    package com.aitongyi.rabbitmq.helloworld;
    
    import com.rabbitmq.client.*;
    import java.io.IOException;
    
    /**
     * 消息消费者
     * 
     *
     */
    public class C {
    
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // 我们在接收端也定义了hello队列。这是为了确保,如果接收端先启动的时候,队列已经存在。
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println("C [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("C [x] Received '" + message + "'");
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    
    

    3.工作队列

    将创建一个工作队列,将信息发送到多个消费者。这中分配方式主要场景是消费者需要根据消息中的内容进行业务逻辑处理,这种消息可以看成是一个任务指令,处理起来比较耗时,通过多个消费者来处理这些消息,来提高数据的吞吐能力。
    工作队列(即任务队列)的主要思想是不用一直等待资源密集型的任务处理完成,这就像一个生产线,将半成品放到生产线中,然后在生产线后面安排多个工人同时对半成品进行处理,这样比一个生产线对应一个工人的吞吐量大几个数量级。

    工厂任务安排者(生产者P)NewTask.java

    /**
     * TODO
     * 
     */
    package com.aitongyi.rabbitmq.queues;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    public class NewTask {
    
        private static final String TASK_QUEUE_NAME = "task_queue";
    
        public static void main(String[] argv) throws java.io.IOException, Exception {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    //      分发消息
            for(int i = 0 ; i < 5; i++){
                String message = "Hello World! " + i;
                channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }
            channel.close();
            connection.close();
        }
    }
    
    

    工人(消费者C1和C2)Worker1.java

    /**
     * TODO
     * 
     */
    package com.aitongyi.rabbitmq.queues;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Worker1 {
        private static final String TASK_QUEUE_NAME = "task_queue";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            System.out.println("Worker1 [*] Waiting for messages. To exit press CTRL+C");
            // 每次从队列中获取数量
            channel.basicQos(1);
    
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
    
                    System.out.println("Worker1 [x] Received '" + message + "'");
                    try {
                        doWork(message);
                    } finally {
                        System.out.println("Worker1 [x] Done");
                        // 消息处理完成确认
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            // 消息消费完成确认
            channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
        }
    
        private static void doWork(String task) {
            try {
                Thread.sleep(1000); // 暂停1秒钟
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    

    工人(消费者C1和C2)Worker2.java

    /**
     * TODO
     * 
     */
    package com.aitongyi.rabbitmq.queues;
    
    import com.rabbitmq.client.*;
    import java.io.IOException;
    
    public class Worker2 {
        private static final String TASK_QUEUE_NAME = "task_queue";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            System.out.println("Worker2 [*] Waiting for messages. To exit press CTRL+C");
            // 每次从队列中获取数量
            channel.basicQos(1);
    
            final Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
    
                    System.out.println("Worker2 [x] Received '" + message + "'");
                    try {
                        doWork(message);
                    } finally {
                        System.out.println("Worker2 [x] Done");
                        // 消息处理完成确认
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            // 消息消费完成确认
            channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
        }
    
        /**
         * 任务处理
         * 
         * @param task
         *            void
         */
        private static void doWork(String task) {
            try {
                Thread.sleep(1000); // 暂停1秒钟
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    

    消息轮询分发

    启动工人(消费者)

    Paste_Image.png

    启动工厂任务安排者(生产者)

    Paste_Image.png

    消费者【1】完成0、3、4

    Paste_Image.png

    消费者【2】完成1、2

    Paste_Image.png

    消息确认(RabbitMQ支持消息确认–ACK)

    如果处理一条消息需要几秒钟的时间,你可能会想,如果在处理消息的过程中,消费者服务器、网络、网卡出现故障挂了,那可能这条正在处理的消息或者任务就没有完成,就会失去这个消息和任务。
    为了确保消息或者任务不会丢失,RabbitMQ支持消息确认–ACK。ACK机制是消费者端从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。如果一个消费者在处理消息时挂掉(网络不稳定、服务器异常、网站故障等原因导致频道、连接关闭或者TCP连接丢失等),那么他就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将此消息重新放入队列中。如果有其他消费者同时在线,RabbitMQ会立即将这个消息推送给这个在线的消费者。这种机制保证了在消费者服务器故障的时候,能不丢失任何消息和任务。

    消息持久化

    如何确保消费者挂掉的情况下,任务不会消失。但是如果RabbitMQ服务器挂了呢?
    如果你不告诉RabbitMQ,当RabbitMQ服务器挂了,她可能就丢失所有队列中的消息和任务。如果你想让RabbitMQ记住她当前的状态和内容,就需要通过2件事来确保消息和任务不会丢失。
    第一件事,在队列声明时,告诉RabbitMQ,这个队列需要持久化:

    boolean durable = true;
    channel.queueDeclare("hello", durable, false, false, null);
    

    已经定义的队列,再次定义是无效的,这就是幂次原理。RabbitMQ不允许重新定义一个已有的队列信息,也就是说不允许修改已经存在的队列的参数。如果你非要这样做,只会返回异常。
    一个快速有效的方法就是重新声明另一个名称的队列,不过这需要修改生产者和消费者的代码,所以,在开发时,最好是将队列名称放到配置文件中。
    这时,即使RabbitMQ服务器重启,新队列中的消息也不会丢失。
    下面我们来看看新消息发送的代码:

    import com.rabbitmq.client.MessageProperties;
    
    channel.basicPublish("", "task_queue",
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes());
    

    关于消息持久化的说明
    标记为持久化后的消息也不能完全保证不会丢失。虽然已经告诉RabbitMQ消息要保存到磁盘上,但是理论上,RabbitMQ已经接收到生产者的消息,但是还没有来得及保存到磁盘上,服务器就挂了(比如机房断电),那么重启后,RabbitMQ中的这条未及时保存的消息就会丢失。因为RabbitMQ不做实时立即的磁盘同步(fsync)。这种情况下,对于持久化要求不是特别高的简单任务队列来说,还是可以满足的。如果需要更强大的保证,那么你可以考虑使用生产者确认反馈机制。

    负载均衡

    默认情况下,RabbitMQ将队列消息随机分配给每个消费者,这时可能出现消息调度不均衡的问题。例如有两台消费者服务器,一个服务器可能非常繁忙,消息不断,另外一个却很悠闲,没有什么负载。RabbitMQ不会主动介入这些情况,还是会随机调度消息到每台服务器。
    这是因为RabbitMQ此时只负责调度消息,不会根据ACK的反馈机制来分析那台服务器返回反馈慢,是不是处理不过来啊。
    为了解决这个问题,我们可以使用【prefetchcount = 1】这个设置。这个设置告诉RabbitMQ,不要一次将多个消息发送给一个消费者。这样做的好处是只有当消费者处理完成当前消息并反馈后,才会收到另外一条消息或任务。这样就避免了负载不均衡的事情了。

    int prefetchCount = 1;
    channel.basicQos(prefetchCount);
    

    RabbitMQ:

    • 生产者是发送消息的应用程序
    • 队列是存储消息的缓冲区
    • 消费者是接收消息的应用程序

    4.发布订阅

    “发布/订阅”模式的基础是将消息广播到所有的接收器上。

    交换器

    实际上,RabbitMQ中消息传递模型的核心思想是:生产者不直接发送消息到队列。实际的运行环境中,生产者是不知道消息会发送到那个队列上,她只会将消息发送到一个交换器,交换器也像一个生产线,她一边接收生产者发来的消息,另外一边则根据交换规则,将消息放到队列中。交换器必须知道她所接收的消息是什么?它应该被放到那个队列中?它应该被添加到多个队列吗?还是应该丢弃?这些规则都是按照交换器的规则来确定的。

    交换器的规则有:

    • direct (直连)
    • topic (主题)
    • headers (标题)
    • fanout (分发)也有翻译为扇出的。

    如使用【fanout】类型创建一个名称为 logs的交换器:

    channel.exchangeDeclare("logs", "fanout");
    

    列出服务器上所有可用的交换器:

    rabbitmqctl list_exchanges
    
    Paste_Image.png

    以【amq.*】开头的交换器都是RabbitMQ默认创建的。在生产环境中,可以自己定义。

    匿名交换器

    我们知道,发送消息到队列时根本没有使用交换器,但是消息也能发送到队列。这是因为RabbitMQ选择了一个空“”字符串的默认交换器。
    来看看我们之前的代码:

    channel.basicPublish("", "hello", null, message.getBytes());
    

    第一个参数就是交换器的名称。如果输入“”空字符串,表示使用默认的匿名交换器。
    第二个参数是【routingKey】路由线索
    匿名交换器规则:
    发送到routingKey名称对应的队列。

    临时队列

    如果要在生产者和消费者之间创建一个新的队列,又不想使用原来的队列,临时队列就是为这个场景而生的:

    • 首先,每当我们连接到RabbitMQ,我们需要一个新的空队列,我们可以用一个随机名称来创建,或者说让服务器选择一个随机队列名称给我们。
    • 一旦我们断开消费者,队列应该立即被删除。

    Java客户端,提供queuedeclare()为我们创建一个非持久化、独立、自动删除的队列名称。

    String queueName = channel.queueDeclare().getQueue();
    

    通过上面的代码就能获取到一个随机队列名称。 例如:它可能是:amq.gen-jzty20brgko-hjmujj0wlg。

    绑定

    将我们的队列跟交换器进行绑定:

    channel.queueBind(queueName, "logs", "");
    

    执行完这段代码后,日志交换器会将消息添加到我们的队列中。

    **获取绑定列表 **

    rabbitmqctl list_bindings
    
    Paste_Image.png

    发布者EmitLog.java

    /**
     * TODO
     * 
     */
    package com.aitongyi.rabbitmq.publish;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 在建立连接后,我们声明了一个交互。
     * 如果当前没有队列被绑定到交换器,消息将被丢弃,因为没有消费者监听,这条消息将被丢弃。
     */
    public class EmitLog {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv) throws Exception {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // direct (直连)、topic (主题)、headers (标题)、fanout (分发)也有翻译为扇出的。
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
            // 分发消息
            for(int i = 0 ; i < 5; i++){
                String message = "Hello World! " + i;
                 // 与之前不同的是它不是将消息发送到匿名交换器中,而是发送到一个名为【logs】的交换器中
                 // 我们提供一个空字符串的routingkey,它的功能被交换器的分发类型【fanout】代替了
                 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
                 System.out.println(" [x] Sent '" + message + "'");
            }
            channel.close();
            connection.close();
        }
    }
    
    

    订阅者ReceiveLogs1.java同ReceiveLogs2.java

    /**
     * TODO
     * 
     */
    package com.aitongyi.rabbitmq.publish;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceiveLogs1 {
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            // 在Java客户端,提供queueDeclare()为我们创建一个非持久化、独立、自动删除的队列名称。
            String queueName = channel.queueDeclare().getQueue();
            // 将我们的队列跟交换器进行绑定
            channel.queueBind(queueName, EXCHANGE_NAME, "");
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }
    
    

    结果:ReceiveLogs1和ReceiveLogs2都收到了EmitLog发送的消息。
    使用【rabbitmqctl list_bindings】命令可以看到两个临时队列的名称:

    Paste_Image.png

    4.消息路由

    绑定关系

    绑定是交换器和队列之间的一种关系,用户微博,微信的例子可以简单的理解为关注

    绑定关系中使用的路由关键字【routingkey】是否有效取决于交换器的类型。如果交换器是分发【fanout】类型,就会忽略路由关键字【routingkey】的作用。

    直连类型交换器

    通过分发【fanout】类型的交换器【logs】广播日志信息,现在我们将日志分debug、info、warn、error这几种基本的级别,实际在生产环境中,避免磁盘空间浪费,应用只会将error级别的日志打印出来。而分发【fanout】类型的交换器会将所有基本的日志都发送出来,如果我们想只接收某一级别的日志信息,就需要使用直连【direct】类型的交换器了

    多重绑定

    我们允许多个队列以相同的路由关键字绑定到同一个交换器中,可以看到,交换器虽然是直连类型,但是绑定后的效果却跟分发类型的交换器类似,相同的是队列1和队列2都会收到同一条来自交换器的消息。
    他们的区别:分发模式下,队列1、队列2会收到所有级别(除ERROR级别以外)的消息,而直连模式下,他们仅仅只会收到ERROR关键字类型的消息。


    RoutingSendDirect.java

    /**
     * TODO
     * 
     */
    package com.aitongyi.rabbitmq.routing;
    
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    
    public class RoutingSendDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
     // 路由关键字
        private static final String[] routingKeys = new String[]{"info" ,"warning", "error"};
        
        public static void main(String[] argv) throws Exception {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    //      声明交换器 direct表示直连
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    //      发送消息
            for(String severity :routingKeys){
                String message = "Send the message level:" + severity;
                channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
                System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
            }
            channel.close();
            connection.close();
        }
    }
    

    ReceiveLogsDirect1.java

    /**
     * TODO
     * 
     */
    package com.aitongyi.rabbitmq.routing;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceiveLogsDirect1 {
        // 交换器名称
        private static final String EXCHANGE_NAME = "direct_logs";
        // 路由关键字
        private static final String[] routingKeys = new String[]{"info" ,"warning", "error"};
        
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    //      声明交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    //      获取匿名队列名称
            String queueName = channel.queueDeclare().getQueue();
    //      根据路由关键字进行多重绑定
            for (String severity : routingKeys) {
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
                System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity);
            }
            System.out.println("ReceiveLogsDirect1 [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }
    
    

    ReceiveLogsDirect2.java

    /**
     * TODO
     * 
     */
    package com.aitongyi.rabbitmq.routing;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class ReceiveLogsDirect2 {
        // 交换器名称
        private static final String EXCHANGE_NAME = "direct_logs";
        // 路由关键字
        private static final String[] routingKeys = new String[]{"error"};
        
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    //      声明交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    //      获取匿名队列名称
            String queueName = channel.queueDeclare().getQueue();
    //      根据路由关键字进行多重绑定
            for (String severity : routingKeys) {
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
                System.out.println("ReceiveLogsDirect2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity);
            }
            System.out.println("ReceiveLogsDirect2 [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }
    
    

    运行ReceiveLogsDirect1和ReceiveLogsDirect2

    Paste_Image.png Paste_Image.png

    运行RoutingSendDirect发送消息

    Paste_Image.png

    查看ReceiveLogsDirect1结果

    Paste_Image.png

    查看ReceiveLogsDirect2结果


    Paste_Image.png

    队列1收到了所有的消息,队列2只收到了error级别的消息。这与我们的预期一样。

    5.Topic模式

    topic类型的交换器允许在RabbitMQ中使用模糊匹配来绑定自己感兴趣的信息。
    如果我想只接收生产者com.test.rabbitmq.topic包下的日志,其他包的忽略掉,之前的日志系统处理起来可能就非常麻烦,还好,我们有匹配模式,现在我们将生产者发送过来的消息按照包名来命名,那么消费者端就可以在匹配模式下使用【#.topic.*】这个路由关键字来获得感兴趣的消息。

    匹配交换器

    通过匹配交换器,我们可以配置更灵活的消息系统,你可以在匹配交换器模式下发送这样的路由关键字:
    “a.b.c”、“c.d”、“quick.orange.rabbit”
    不过一定要记住,路由关键字【routingKey】不能超过255个字节(bytes)
    匹配交换器的匹配符
    *(星号)表示一个单词
    #(井号)表示零个或者多个单词

    交换器在匹配模式下:

    如果消费者端的路由关键字只使用【#】来匹配消息,在匹配【topic】模式下,它会变成一个分发【fanout】模式,接收所有消息。
    如果消费者端的路由关键字中没有【#】或者【*】,它就变成直连【direct】模式来工作。

    TopicSend.java

    /**
     * TODO
     * 
     */
    package com.aitongyi.rabbitmq.topic;
    
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    
    public class TopicSend {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] argv) {
            Connection connection = null;
            Channel channel = null;
            try {
                ConnectionFactory factory = new ConnectionFactory();
                factory.setHost("localhost");
    
                connection = factory.newConnection();
                channel = connection.createChannel();
    //          声明一个匹配模式的交换器
                channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    
                // 待发送的消息
                String[] routingKeys = new String[]{"quick.orange.rabbit", 
                                                    "lazy.orange.elephant", 
                                                    "quick.orange.fox", 
                                                    "lazy.brown.fox", 
                                                    "quick.brown.fox", 
                                                    "quick.orange.male.rabbit", 
                                                    "lazy.orange.male.rabbit"};
    //          发送消息
                for(String severity :routingKeys){
                    String message = "From "+severity+" routingKey' s message!";
                    channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
                    System.out.println("TopicSend [x] Sent '" + severity + "':'" + message + "'");
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Exception ignore) {
                    }
                }
            }
        }
    }
    

    ReceiveLogsTopic1.java

    /**
     * TODO
     * 
     */
    package com.aitongyi.rabbitmq.topic;
    
    import com.rabbitmq.client.*;
    import java.io.IOException;
    
    public class ReceiveLogsTopic1 {
    
        private static final String EXCHANGE_NAME = "topic_logs";
         
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    //      声明一个匹配模式的交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            String queueName = channel.queueDeclare().getQueue();
            // 路由关键字
            String[] routingKeys = new String[]{"*.orange.*"};
    //      绑定路由关键字
            for (String bindingKey : routingKeys) {
                channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
                System.out.println("ReceiveLogsTopic1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
            }
    
            System.out.println("ReceiveLogsTopic1 [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("ReceiveLogsTopic1 [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }
    

    ReceiveLogsTopic2.java

    /**
     * TODO
     * 
     */
    package com.aitongyi.rabbitmq.topic;
    
    import com.rabbitmq.client.*;
    import java.io.IOException;
    
    public class ReceiveLogsTopic2 {
    
        private static final String EXCHANGE_NAME = "topic_logs";
         
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    //      声明一个匹配模式的交换器
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            String queueName = channel.queueDeclare().getQueue();
            // 路由关键字
            String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
    //      绑定路由关键字
            for (String bindingKey : routingKeys) {
                channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
                System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);
            }
    
            System.out.println("ReceiveLogsTopic2 [*] Waiting for messages. To exit press CTRL+C");
    
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("ReceiveLogsTopic2 [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
                }
            };
            channel.basicConsume(queueName, true, consumer);
        }
    }
    

    运行结果

    TopicSend发送7条数据


    Paste_Image.png

    ReceiveLogsTopic1接收3条


    Paste_Image.png

    ReceiveLogsTopic1接收4条


    Paste_Image.png

    相关文章

      网友评论

        本文标题:rabbitmq学习

        本文链接:https://www.haomeiwen.com/subject/vhtbittx.html