美文网首页RabbitMQ工作生活
Rabbitmq打怪升级之路(六)生产者与消费者模型

Rabbitmq打怪升级之路(六)生产者与消费者模型

作者: 亚武de小文 | 来源:发表于2019-07-02 19:36 被阅读0次

简书:亚武de小文 【原创:转载请注明出处】

生产者与消费者模型

LengToo上学.png

RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。


一、基本模型图
[亚武de小文]生产者消费者模型图.png
二、工作流程
  • 发送端
    1)创建连接 2)创建通道 3)声明队列 4)发送消息
  • 接收端
    1)创建连接 2)创建通道 3)声明队列 4)监听队列 5)接收消息 6)ack回复
三、代码
生产者-发件人
  • MsgProducer.java

    package com.yawu.xiaowen.pc;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 生产者-发件人
     * @date 2019.06.25
     * @author yawu
     */
    public class MsgProducer {
        private static final String QUEUE_NAME = "mq_pc_hello";
        private static final Logger LOGGER = LoggerFactory.getLogger(MsgProducer.class);
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            Connection connection = null;
            Channel channel = null;
            try {
                // 连接管理器:应用程序与RabbitMQ建立连接的管理器。
                ConnectionFactory factory = new ConnectionFactory();
                // 服务器地址
                factory.setHost("127.0.0.1");
                // 帐号密码:默认为guest/guest,可省略
                factory.setUsername("guest");
                factory.setPassword("guest");
                // 新建连接
                connection = factory.newConnection();
                // 再创建一个信道
                channel = connection.createChannel();
    
                //1、在信道中声明一个队列
                /**
                 * 参数详解
                 * queue:要创建的队列名
                 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息
                 * exclusive:true表示一个队列是否独占连接,
                 * autoDelete:true表示服务器不在使用这个队列是会自动删除它
                 * arguments:其它属性参数
                 */
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                //2、创建一条消息
                String message = "Hello,亚武de小文!";
                // 3、采用二进制流的方式传输
                byte[] msg = message.getBytes("UTF-8");
                // 4、channel是一个信道,它接收到msg数据,并将纳入到QUEUE_NAME队列中
                /**
                 * 消息发布方法参数详解:
                 * exchange:如果没有指定,则使用Default Exchange
                 * routingKey:消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                 * props:消息包含的属性
                 * body:消息体
                 */
                channel.basicPublish("", QUEUE_NAME, null, msg);
    
                LOGGER.info("发件人---发送消息:{}", message);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
    
        }
    }
    
    
消费者-收件人
  • MsgConsumer.java

    package com.yawu.xiaowen.pc;
    
    import com.rabbitmq.client.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    /**
     * 消费者-收件人
     * @date 2019.06.25
     * @author yawu
     */
    public class MsgConsumer {
        private static final String QUEUE_NAME = "mq_pc_hello";
        private static final Logger LOGGER = LoggerFactory.getLogger(MsgConsumer.class);
    
        public static void main(String[] args) {
            try {
                // 应用程序与RabbitMQ建立连接的管理器。
                ConnectionFactory factory = new ConnectionFactory();
                // 服务器地址
                factory.setHost("127.0.0.1");
    
                // 新建一个连接
                Connection connection = factory.newConnection();
                // 创建一个信道
                Channel channel = connection.createChannel();
    
                //1、首先在通道中申明一个队列
                /**
                 * 参数详解
                 * queue:要创建的队列名
                 * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息
                 * exclusive:true表示一个队列只能被一个消费者占有并消费
                 * autoDelete:true表示服务器不在使用这个队列是会自动删除它
                 * arguments:其它参数
                 */
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
                //2、创建消费者,并重写如何消费的方法,eg:输出消息
                //3、首先从信道里面获取数据
                Consumer consumer = new DefaultConsumer(channel) {
                    /**
                     * 消费者接收消息调用此方法
                     * @param consumerTag   消费者的标签,在channel.basicConsume()去指定
                     * @param envelope  消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                     * (收到消息失败后是否需要重新发送)
                     * @param properties
                     * @param body
                     * @throws IOException
                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                            throws IOException {
                        String message = new String(body, "UTF-8");
                        LOGGER.info("收件人---收到消息:{}", message);
                    }
                };
    
                /**
                 * 4、收到了消息后,提示信道已经收到消息了。可以继续发送其它消息
                 * 【注】第二个参数autoAck如果为false,那么消息会一直保存在RabbitMQ服务器中,Unacked
                 * 消费者没有确认消息被消费,消息一直留在队列中,只有当从有新的消费者加入时,消息被分发到新的消费者。
                 */
                channel.basicConsume(QUEUE_NAME, true, consumer);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    
四、几种情况运行与分析
1、分别启动生产者服务和消费者服务
PC同启1.png PC同启2.png PC同启3.png
2、关闭生产者服务,开启消费者服务
P关C启1.png P关C启2.png
3、关闭消费者服务,开启生产者服务
P启C关1.png

该信息处于队列中等待状态,等待消费者消费

P启C关2.png P启C关3.png
4、服务都保持启动
  • 设置autoAck参数为false

    /**
     * 4、收到了消息后,提示信道已经收到消息了。可以继续发送其它消息
     * 【注】第二个参数autoAck如果为false,那么消息会一直保存在RabbitMQ服务器中
     * 消费者没有确认消息被消费,消息一直留在队列中,只有当从有新的消费者加入时,消息被分发到新的消费者。
     */
    channel.basicConsume(QUEUE_NAME, false, consumer);
    
    PC同启_autoAck1.png
  • 生产者发送多条信息(此处我发出五条消息)


    PC同启_autoAck2.png

相关文章

  • Rabbitmq打怪升级之路(六)生产者与消费者模型

    简书:亚武de小文 【原创:转载请注明出处】 生产者与消费者模型 RabbitMQ 整体上是一个生产者与消费者模型...

  • Rabbitmq打怪升级之路(七)AMQP协议

    简书:亚武de小文 【原创:转载请注明出处】 生产者与消费者模型 RabbitMQ 是遵从 AMQP 协议的, 换...

  • RabbitMQ(二):Java 操作队列

    1. 简单模式 模型: P:消息的生产者 队列:rabbitmq C:消息的消费者 获取 MQ 连接 生产者生产消...

  • 生产者与消费者模型

    生产者与消费者模型 通过使用Object的wait(),notify()方法进行生产者与消费者模型中出现的数据同步...

  • RabbitMQ 概念介绍

    RabbitMQ 概念介绍 信道:信道是生产者/消费者与 RabbitMQ 通信的渠道。信道是建立在 TCP 连接...

  • 34.Python之生产者消费者模型

    Python之生产者消费者模型(非常重要) 生产者消费者模型模型指的是一种解决问题的套路。 生产者消费者模型中包含...

  • 生产者和消费者模型

    生产者和消费者模型 1. 什么是生产者和消费者模型 生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者...

  • 生产者消费者(一)

    生产者消费者模型: 生产者------> 缓存<-------- 消费者

  • RabbitMQ的概述

    RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一...

  • 二、RabbitMQ架构模型

    RabbitMQ架构模型 一、生产者和消费者 二、队列 三、交换器、路由键、绑定 Exchange:交换器。 Ra...

网友评论

    本文标题:Rabbitmq打怪升级之路(六)生产者与消费者模型

    本文链接:https://www.haomeiwen.com/subject/sznthctx.html