美文网首页RabbitMQ工作生活
Rabbitmq打怪升级之路(六)生产者与消费者模型

Rabbitmq打怪升级之路(六)生产者与消费者模型

作者: 亚武de小文 | 来源:发表于2019-07-02 19:36 被阅读0次

    简书:亚武de小文 【原创:转载请注明出处】

    生产者与消费者模型

    LengToo上学.png

    RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。


    一、基本模型图
    [亚武de小文]生产者消费者模型图.png
    二、工作流程
    • 发送端
      1)创建连接 2)创建通道 3)声明队列 4)发送消息
    • 接收端
      1)创建连接 2)创建通道 3)声明队列 4)监听队列 5)接收消息 6)ack回复
    三、代码
    生产者-发件人
    • MsgProducer.java

      package com.yawu.xiaowen.pc;
      
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.io.IOException;
      import java.util.concurrent.TimeoutException;
      
      /**
       * 生产者-发件人
       * @date 2019.06.25
       * @author yawu
       */
      public class MsgProducer {
          private static final String QUEUE_NAME = "mq_pc_hello";
          private static final Logger LOGGER = LoggerFactory.getLogger(MsgProducer.class);
      
          public static void main(String[] args) throws IOException, TimeoutException {
      
              Connection connection = null;
              Channel channel = null;
              try {
                  // 连接管理器:应用程序与RabbitMQ建立连接的管理器。
                  ConnectionFactory factory = new ConnectionFactory();
                  // 服务器地址
                  factory.setHost("127.0.0.1");
                  // 帐号密码:默认为guest/guest,可省略
                  factory.setUsername("guest");
                  factory.setPassword("guest");
                  // 新建连接
                  connection = factory.newConnection();
                  // 再创建一个信道
                  channel = connection.createChannel();
      
                  //1、在信道中声明一个队列
                  /**
                   * 参数详解
                   * queue:要创建的队列名
                   * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息
                   * exclusive:true表示一个队列是否独占连接,
                   * autoDelete:true表示服务器不在使用这个队列是会自动删除它
                   * arguments:其它属性参数
                   */
                  channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                  //2、创建一条消息
                  String message = "Hello,亚武de小文!";
                  // 3、采用二进制流的方式传输
                  byte[] msg = message.getBytes("UTF-8");
                  // 4、channel是一个信道,它接收到msg数据,并将纳入到QUEUE_NAME队列中
                  /**
                   * 消息发布方法参数详解:
                   * exchange:如果没有指定,则使用Default Exchange
                   * routingKey:消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
                   * props:消息包含的属性
                   * body:消息体
                   */
                  channel.basicPublish("", QUEUE_NAME, null, msg);
      
                  LOGGER.info("发件人---发送消息:{}", message);
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  if (channel != null) {
                      channel.close();
                  }
                  if (connection != null) {
                      connection.close();
                  }
              }
      
          }
      }
      
      
    消费者-收件人
    • MsgConsumer.java

      package com.yawu.xiaowen.pc;
      
      import com.rabbitmq.client.*;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.io.IOException;
      
      /**
       * 消费者-收件人
       * @date 2019.06.25
       * @author yawu
       */
      public class MsgConsumer {
          private static final String QUEUE_NAME = "mq_pc_hello";
          private static final Logger LOGGER = LoggerFactory.getLogger(MsgConsumer.class);
      
          public static void main(String[] args) {
              try {
                  // 应用程序与RabbitMQ建立连接的管理器。
                  ConnectionFactory factory = new ConnectionFactory();
                  // 服务器地址
                  factory.setHost("127.0.0.1");
      
                  // 新建一个连接
                  Connection connection = factory.newConnection();
                  // 创建一个信道
                  Channel channel = connection.createChannel();
      
                  //1、首先在通道中申明一个队列
                  /**
                   * 参数详解
                   * queue:要创建的队列名
                   * durable:是否持久化。如果为true,可以在RabbitMQ崩溃后恢复消息
                   * exclusive:true表示一个队列只能被一个消费者占有并消费
                   * autoDelete:true表示服务器不在使用这个队列是会自动删除它
                   * arguments:其它参数
                   */
                  channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      
                  //2、创建消费者,并重写如何消费的方法,eg:输出消息
                  //3、首先从信道里面获取数据
                  Consumer consumer = new DefaultConsumer(channel) {
                      /**
                       * 消费者接收消息调用此方法
                       * @param consumerTag   消费者的标签,在channel.basicConsume()去指定
                       * @param envelope  消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
                       * (收到消息失败后是否需要重新发送)
                       * @param properties
                       * @param body
                       * @throws IOException
                       */
                      @Override
                      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                              throws IOException {
                          String message = new String(body, "UTF-8");
                          LOGGER.info("收件人---收到消息:{}", message);
                      }
                  };
      
                  /**
                   * 4、收到了消息后,提示信道已经收到消息了。可以继续发送其它消息
                   * 【注】第二个参数autoAck如果为false,那么消息会一直保存在RabbitMQ服务器中,Unacked
                   * 消费者没有确认消息被消费,消息一直留在队列中,只有当从有新的消费者加入时,消息被分发到新的消费者。
                   */
                  channel.basicConsume(QUEUE_NAME, true, consumer);
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }
      
      
    四、几种情况运行与分析
    1、分别启动生产者服务和消费者服务
    PC同启1.png PC同启2.png PC同启3.png
    2、关闭生产者服务,开启消费者服务
    P关C启1.png P关C启2.png
    3、关闭消费者服务,开启生产者服务
    P启C关1.png

    该信息处于队列中等待状态,等待消费者消费

    P启C关2.png P启C关3.png
    4、服务都保持启动
    • 设置autoAck参数为false

      /**
       * 4、收到了消息后,提示信道已经收到消息了。可以继续发送其它消息
       * 【注】第二个参数autoAck如果为false,那么消息会一直保存在RabbitMQ服务器中
       * 消费者没有确认消息被消费,消息一直留在队列中,只有当从有新的消费者加入时,消息被分发到新的消费者。
       */
      channel.basicConsume(QUEUE_NAME, false, consumer);
      
      PC同启_autoAck1.png
    • 生产者发送多条信息(此处我发出五条消息)


      PC同启_autoAck2.png

    相关文章

      网友评论

        本文标题:Rabbitmq打怪升级之路(六)生产者与消费者模型

        本文链接:https://www.haomeiwen.com/subject/sznthctx.html