美文网首页
初识RabbitMQ(maven版本)(二)

初识RabbitMQ(maven版本)(二)

作者: 程序员小杰 | 来源:发表于2020-10-03 18:31 被阅读0次

    1.1 什么是MQ

    MQ(Message Queue) : 翻译为 消息队列,通过典型的 生产者消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。别名为 消息中间件。 可以把消息传递的过程想象成 当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。

    1.2 MQ有哪些

    当今市面上有很多主流的消息中间件,如老牌的ActiveMQRabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。

    1.3 不同MQ特点

    # 1.ActiveMQ
            ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持JMS规范的的消息中间件。丰富的API,多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎!
    
    # 2.Kafka
            Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,
            追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,
            适合产生大量数据的互联网服务的数据收集业务。
    
    # 3.RocketMQ
            RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起
            源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消
            息推送、日志流式处理、binglog分发等场景。
    
    # 4.RabbitMQ
            RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和
            发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在
            其次。
            
    

    RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集。


    2.1 RabbitMQ

    基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

    核心概念

    • server:又称broker,接受客户端连接,实现AMQP实体服务。

    • connection:连接和具体broker网络连接。

    • channel:网络信道,几乎所有操作都在channel中进行,channel是消息读写的通道。客户端可以建立多个channel,每个channel表示一个会话任务。

    • message:消息,服务器和应用程序之间传递的数据,由properties和body组成。properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性;body是消息实体内容。

    • Virtual host:虚拟主机,用于逻辑隔离,最上层消息的路由。一个Virtual host可以若干个Exchange和Queue,同一个Virtual host不能有同名的Exchange或Queue。

    • Exchange:交换机,接受消息,根据路由键转发消息到绑定的队列上。

    • banding:Exchange和Queue之间的虚拟连接。

    • routing key:一个路由规则,虚拟机根据他来确定如何路由一条消息。

    • Queue:消息队列,用来存放消息的队列。


      image.png

    官网: https://www.rabbitmq.com/
    官方教程: https://www.rabbitmq.com/getstarted.html

    2.2 RabbitMQ特性

    • 1.可靠性
      RabbitMQ使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等,在后续的文章中我们将深入讲述如何保证RabbitMQ消息的可靠性
    • 2.灵活的路由
      在消息进入队列之前,通过交换器来路由消息,对于典型的路由功能,RabbitMQ己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器
    • 3.扩展性
      多个RabbitMQ节点可以组成一个集群,也可以根据实际业务情况动态地扩展 集群中节点
    • 4.高可用
      队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用
    • 5.多种协议
      RabbitMQ除了原生支持AMQP协议,还支持STOMP, MQTT等多种消息中间件协议
    • 6.多语言客户端
      RabbitMQ几乎支持所有常用语言,比如 Java、 Python、 Ruby、 PHP、 C#、 JavaScript等
    • 7.管理界面
      RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
    • 8.插件机制
      RabbitMQ 提供了许多插件 , 以实现从多方面进行扩展,当然也可以编写自己的插件。

    3.1 RabbitMQ支持的消息模型

    image.png
    image.png

    3.2 引入依赖

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.3</version>
        </dependency>
    

    4.1第一种模型(直连)

    Hello Word

    图解:

    • P:生产者,也就是要发送消息的程序
    • C:消费者:消息的接受者,会一直等待消息到来。
    • hello:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
    1. 开发生产者
           // 创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
          //设置服务地址
            connectionFactory.setHost("47.105.198.54");
          //端口
            connectionFactory.setPort(5672);
            //设置连接哪个虚拟主机
            connectionFactory.setVirtualHost("/test-1");
          //设置用户名、密码
            connectionFactory.setUsername("test");
            connectionFactory.setPassword("123456");
            // 获取连接对象
            Connection connection = connectionFactory.newConnection();
            // 获取连接通道
            Channel channel = connection.createChannel();
    
            /**
             * 参数1:String queue 队列名称 如果队列不存在会自动创建
             * 参数2:boolean durable  队列是否持久化 true 持久化 false 不持久化  队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,
             * 参数3:boolean exclusive 是否独占队列 true 独占队列 false 不独占
             * 参数4:boolean autoDelete 当最后一个消费者断开连接之后队列是否自动被删除  true 自动删除,
             * 参数5:Map<String, Object> arguments  额外附加参数
             */
            //通道绑定消息队列
            channel.queueDeclare("hello-1",false,false,false,null);
    
    
            /**
             * 参数1:String exchange  交换机名称
             * 参数2:String routingKey  队列名称
             * 参数3:BasicProperties props  传递消息额外参数
             * 参数4:byte[] body  消息的具体内容
             */
            //发布消息
            channel.basicPublish("","hello-1",null,"hello rabbitmq".getBytes());
    
            //关闭连接
            channel.close();
            connection.close();
    
    2.开发消费者
    // 创建连接工厂
           ConnectionFactory connectionFactory = new ConnectionFactory();
           connectionFactory.setHost("47.105.198.54");
           connectionFactory.setVirtualHost("/test-1");
           connectionFactory.setUsername("test");
           connectionFactory.setPassword("123456");
           //创建连接对象
           Connection connection = connectionFactory.newConnection();
    
           // 创建通道
           Channel channel = connection.createChannel();
    
           /**
            * 该配置尽量与生产者一致
            * '参数1':用来声明通道对应的队列
            * '参数2':用来指定是否持久化队列
            * '参数3':用来指定是否独占队列
            * '参数4':用来指定是否自动删除队列
            * '参数5':对队列的额外配置
            */
           // 通道绑定队列
           channel.queueDeclare("hello-1",false,false,false,null);
    
           /**
            * 参数1:String queue 队列名称
            * 参数2:boolean autoAck 开启消息的自动确认机制
            * 参数3:Consumer callback  消费时回调接口
            */
           //消费消息
           channel.basicConsume("hello-1",true,new DefaultConsumer(channel){
               /**
                *
                * @param consumerTag
                * @param envelope
                * @param properties
                * @param body 消息队列中取出的消息
                * @throws IOException
                */
               @Override
               public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                  System.out.println("消费消息:" + new String(body));
               }
           });
    

    注意:消费者不需要关闭连接
    上述代码有大量重复,所以将重复代码提为工具类。后续都使用工具类。

    工具类
    
    public class RabbitMQUtil {
    //创建连接
        public static Connection createConn() {
            try{
                // 创建连接工厂
                ConnectionFactory connectionFactory = new ConnectionFactory();
                connectionFactory.setHost("47.105.198.54");
                connectionFactory.setPort(5672);
                // 设置连接哪个虚拟主机
                connectionFactory.setVirtualHost("/test-1");
                connectionFactory.setUsername("test");
                connectionFactory.setPassword("123456");
                Connection connection = connectionFactory.newConnection();
                return connection;
            }catch (Exception e){
                e.printStackTrace();
            }
           return null;
        }
    //关闭连接
        public static void closeConn(Channel channel,Connection connection) {
            try{
                if (channel != null){
                    channel.close();
                }
                if(connection != null){
                    connection.close();
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    

    4.2第二种模型(Work queues)

    Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

    Work queues

    图解:

    • P:生产者:任务的发布者
    • C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
    • C2:消费者-2:领取任务并完成任务,假设完成速度快
    1. 开发生产者
    public static void main(String[] args) throws Exception {
    // // 创建连接对象
            Connection conn = RabbitMQUtil.createConn();
    
            Channel channel = conn.createChannel();
    
            channel.queueDeclare("work", false, false, false, null);
    
            for (int i = 0; i < 20; i++) {
                channel.basicPublish("","work",null,(i + "hello word queue").getBytes());
            }
    //关闭连接
            RabbitMQUtil.closeConn(channel,conn);
        }
    
    2.开发消费者-1
     public static void main(String[] args) throws IOException {
            Connection conn = RabbitMQUtil.createConn();
            Channel channel = conn.createChannel();
            channel.queueDeclare("work",false,false,false,null);
          /**
            * 参数1:String queue 队列名称
            * 参数2:boolean autoAck 开启消息的自动确认机制
            * 参数3:Consumer callback  消费时回调接口
            */
            channel.basicConsume("work",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消费1:" + new String(body));
                }
            });
        }
    

    3.开发消费者-2

     public static void main(String[] args) throws IOException {
            Connection conn = RabbitMQUtil.createConn();
            Channel channel = conn.createChannel();
            channel.queueDeclare("work",false,false,false,null);
          /**
            * 参数1:String queue 队列名称
            * 参数2:boolean autoAck 开启消息的自动确认机制
            * 参数3:Consumer callback  消费时回调接口
            */
            channel.basicConsume("work",true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费2:" + new String(body));
                }
            });
        }
    
    4.测试结果
    消费者1 消费者2
    总结:默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。
    5.消息自动确认机制
    来自官网

    完成一项任务可能需要几秒钟。 你可能想知道,如果其中一个使用者(也就是消费者)开始一项漫长的任务并仅部分完成而死掉(发生异常),会发生什么情况。 使用我们当前的代码,RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除。 在这种情况下,如果您杀死一个工人,我们将丢失正在处理的消息。 我们还将丢失所有发送给该特定工作人员但尚未处理的消息。
    但是我们不想丢失任何任务。 如果一个工人死亡,我们希望将任务交付给另一个工人。

    1.消费者-1
    public static void main(String[] args) throws IOException {
            Connection conn = RabbitMQUtil.createConn();
            final Channel channel = conn.createChannel();
            channel.queueDeclare("work",false,false,false,null);
            //一次只接受一条未确认的消息
            channel.basicQos(1);
            /**
             * 参数1:队列名称
             * 参数2:消息自动确认 true 消费者自动向mq确认消息已经消费 false 不会自动确认
             */
            channel.basicConsume("work",false,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消费-1:" + new String(body));
                    /**
                     * 参数1:确认队列中那个消息被消费了
                     * 参数2:是否开启多个消息同时确认  true 开启
                     */
                    //手动确认消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            });
        }
    
    2.消费者-2
      public static void main(String[] args) throws IOException {
            Connection conn = RabbitMQUtil.createConn();
            final Channel channel = conn.createChannel();
            channel.queueDeclare("work",false,false,false,null);
            //一次只接受一条未确认的消息
            channel.basicQos(1);
            /**
             * 参数1:队列名称
             * 参数2:消息自动确认 true 消费者自动向mq确认消息已经消费 false 不会自动确认
             */
            channel.basicConsume("work",false,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费-2:" + new String(body));
                    /**
                     * 参数1:确认队列中那个消息被消费了
                     * 参数2:是否开启多个消息同时确认  true 开启
                     */
                    //手动确认消息
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            });
        }
    
    • 设置通道一次只能消费一个消息

    • 关闭消息的自动确认,开启手动确认消息

    3.测试结果
    消费者1 消费者2

    4.3 第三种模型(fanout)

    image-20191126213115873.png

    在订阅模式下,消息发送流程是这样的:

    • 可以有多个消费者

    • 每个消费者有自己的queue(队列)

    • 每个队列都要绑定到Exchange(交换机)

    • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

    • 交换机把消息发送给绑定过的所有队列

    • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
      图解:

    • P:生产者,向Exchange发送消息

    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给队列

    • amq.gen-RQ6、amq.gen-As8:临时队列,接收交换机发送过来的消息

    • C1:消费者,领取任务并且完成任务

    • C2:消费者,领取任务并且完成任务

    1. 开发生产者
    public static void main(String[] args) throws IOException {
            // 创建连接对象
            Connection conn = RabbitMQUtil.createConn();
    
            Channel channel = conn.createChannel();
            /**
             * 参数1: 交换机名称
             * 参数2: 交换机类型  fanout 广播类型
             */
            // 声明fanout交换机(不存在时会创建)
            channel.exchangeDeclare("logs-fanout", BuiltinExchangeType.FANOUT);
    
            /**
             * exchange:交换机名称
             * routingKey:队列名称,在fanout模式下指定也没有作用
             */
            // 发送消息
            channel.basicPublish("logs-fanout","",null,"hello logs-fanout".getBytes());
    
            RabbitMQUtil.closeConn(channel,conn);
        }
    
    2.开发消费者-1
    public static void main(String[] args) throws IOException {
            Connection conn = RabbitMQUtil.createConn();
            Channel channel = conn.createChannel();
    
            //通道绑定交换机
            channel.exchangeDeclare("logs-fanout",BuiltinExchangeType.FANOUT);
    
            // 临时队列
            String queueName = channel.queueDeclare().getQueue();
            //绑定交换机和队列
            channel.queueBind(queueName,"logs-fanout","");
    
            //消费消息
            channel.basicConsume(queueName,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者-1:" + new String(body));
                }
            });
        }
    

    3.开发消费者-2

        public static void main(String[] args) throws IOException {
            Connection conn = RabbitMQUtil.createConn();
            Channel channel = conn.createChannel();
    
            //通道绑定交换机
            channel.exchangeDeclare("logs-fanout",BuiltinExchangeType.FANOUT);
    
            // 临时队列
            String queueName = channel.queueDeclare().getQueue();
            //绑定交换机和队列
            channel.queueBind(queueName,"logs-fanout","");
    
            //消费消息
            channel.basicConsume(queueName,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者-2:" + new String(body));
                }
            });
        }
    
    image.png
    image.png

    4.4 第四种模型(Routing)

    在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

    在Direct模型下:

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

    • 消息的发送方在向Exchange发送消息时,也必须指定消息的 RoutingKey

    • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

      image-20191126220145375.png
      图解:
    • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

    • X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

    • C1:消费者,其所在队列指定了需要routing key 为 error 的消息

    • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning的消息

    1. 开发生产者
     public static void main(String[] args) throws IOException {
            //获取连接对象
            Connection conn = RabbitMQUtil.createConn();
            //获取通道对象
            Channel channel = conn.createChannel();
            /**
             * 参数1:交换机名称
             * 参数2:交换机类型
             */
            //绑定direct交换机(不存在就创建)
            channel.exchangeDeclare("logs_direct", BuiltinExchangeType.DIRECT);
            String key = "error";
            //发送消息并指定一个routing key
            channel.basicPublish("logs_direct",key,null,("指定的route key["+key+"]的消息,"+key+"级别的日志打印").getBytes());
    
            //关闭连接
            RabbitMQUtil.closeConn(channel,conn);
        }
    
    2.开发消费者-1
    public static void main(String[] args) throws IOException {
            Connection conn = RabbitMQUtil.createConn();
    
            Channel channel = conn.createChannel();
            //交换机名称
            String exchangeName = "logs_direct";
            //声明direct交换机
            channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT);
            //创建临时队列
            String queue = channel.queueDeclare().getQueue();
            //绑定队列和交换机 并指定一个routing key
            channel.queueBind(queue,exchangeName,"error");
            channel.queueBind(queue,exchangeName,"info");
            channel.queueBind(queue,exchangeName,"warning");
            //消费消息
            channel.basicConsume(queue,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者-1-error、info、warning级别日志打印:" + new String(body));
                }
            });
        }
    
    3.开发消费者-2
       public static void main(String[] args) throws IOException {
            Connection conn = RabbitMQUtil.createConn();
    
            Channel channel = conn.createChannel();
            String exchangeName = "logs_direct";
            //声明交换机
            channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT);
            //创建临时队列
            String queue = channel.queueDeclare().getQueue();
            //绑定队列和交换机 并指定一个routing key
            channel.queueBind(queue,exchangeName,"info");
            //消费消息
            channel.basicConsume(queue,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者-2-info级别日志打印:" + new String(body));
                }
            });
        }
    
    4.测试结果
    1.测试生产者发送Route key为info的消息时
    消费者1
    消费者2
    2.测试生产者发送Route key为error的消息时
    消费者1
    消费者2

    4.5 第五种模型(Topic)

    Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型的Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: user.insert

    image-20191127121900255.png

    统配符

    * (star) can substitute for exactly one word. 匹配不多不少恰好1个词
    # (hash) can substitute for zero or more words. 匹配一个或多个词

    如:

    audit.#    匹配audit.irs.corporate或者 audit.irs 等
    audit.*   只能匹配 audit.irs
    
    1.开发生产者
        public static void main(String[] args) throws IOException {
            Connection conn = RabbitMQUtil.createConn();
    
            Channel channel = conn.createChannel();
    //声明交换机和交换机类型 topic 使用动态路由(通配符方式)
            String exchangName = "topics";
           channel.exchangeDeclare(exchangName, BuiltinExchangeType.TOPIC);
    //动态路由key
            String routingKey = "user.save";
    //发布消息
            channel.basicPublish(exchangName,routingKey,null,("动态路由模式的消息 routing key: ["+routingKey + "]").getBytes());
            RabbitMQUtil.closeConn(channel,conn);
        }
    
    2.开发消费者-1

    Routing Key中使用*通配符方式

        public static void main(String[] args) throws IOException {
            Connection conn = RabbitMQUtil.createConn();
            Channel channel = conn.createChannel();
            String exchangName = "topics";
     //声明交换机
            channel.exchangeDeclare(exchangName,BuiltinExchangeType.TOPIC);
    //创建临时队列
            String queue = channel.queueDeclare().getQueue();
    //绑定队列与交换机并设置获取交换机中动态路由
            channel.queueBind(queue,exchangName,"user.*");
    //消费消息
            channel.basicConsume(queue,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1:" + new String(body));
                }
            });
        }
    
    3.开发消费者-2

    Routing Key中使用#通配符方式

      public static void main(String[] args) throws IOException {
            Connection conn = RabbitMQUtil.createConn();
            Channel channel = conn.createChannel();
            String exchangName = "topics";
            //声明交换机
            channel.exchangeDeclare(exchangName,BuiltinExchangeType.TOPIC);
            //创建临时队列
            String queue = channel.queueDeclare().getQueue();
            //绑定队列与交换机并设置获取交换机中动态路由
            channel.queueBind(queue,exchangName,"user.#");
            //消费消息
            channel.basicConsume(queue,true,new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2:" + new String(body));
                }
            });
        }
    
    4.测试

    1.使用Routing Key为user.save


    消费者-1
    消费者-2

    1.使用Routing Key为user.email.save


    消费者-1
    消费者-2

    使用过程

    生产者发送消息过程

    • 生产者连接到 RabbitMQ Broker 建立一个连接(connection),开启一个信道(channel)

    • 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等

    • 生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等

    • 生产者通过路由键将交换器和队列绑定起来

    • 生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息

    • 相应的交换器根据接收到的路由键查找相匹配的队列

    • 如果找到,则将从生产者发送过来的消息存入相应的队列中

    • 如果没有找到 ,则根据生产者配置的属性选择丢弃还是回退给生产者

    • 关闭信道。

    • 关闭连接。

    消费者接收消息过程

    • 消费者连接到 RabbitMQ Broker ,建立一个连接(connection),开启一个信道(channel)
    • 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,以及做些准备工作
    • 等待 RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
    • 消费者确认( ack )接收到的消息
    • RabbitMQ 从队列中删除相应己经被确认的消息
    • 关闭信道。
    • 关闭连接。

    相关文章

      网友评论

          本文标题:初识RabbitMQ(maven版本)(二)

          本文链接:https://www.haomeiwen.com/subject/yekyuktx.html