美文网首页程序员
[用官方文档学习RabbitMQ]——1.RabbitMQ的简单

[用官方文档学习RabbitMQ]——1.RabbitMQ的简单

作者: AceCream佳 | 来源:发表于2017-07-20 20:07 被阅读0次

    最近在研究MQ,考虑用RabbitMQ性价比会高一些。这次学习的途径是看RabbitMQ官方网站的英文文档(好歹CET 4 Boy)。基本上就是把文档的东西翻译过来,加上点自己的理解。代码copy来,尽量把自己的注释写好。有什么错误遗漏还请指点~~~

    简介

    RabbitMQ官方解释是一种消息代理。它可以接受或转发消息。可以用快递来理解,我们把邮件包装好送到快递公司,快递公司将我们发的货物送到收件人的手中。
    这里描述三个角色:Producing、Queue、Consumer
    Producing:生产仅仅意味着发送,发送消息的程序是生产者。

    producer

    Queue:队列可以理解为RabbitMQ里的邮箱,消息虽然会贯穿我们的应用程序和RabbitMQ,但是它们只能被存储在队列中。队列只与主机的内存和磁盘容量绑定,它本质上是一个大型缓冲区。多个生产者可以将大量信息传送到同一个队列中,多个生产者也可以从同一个队列获取数据。

    queue
    Consumer:同样顾名思义,获取消息的程序是消费者
    Consumer

    这里我们要注意,生产者、消费者、代理不必部署在同一台主机上,实际上,大多数应用程序里,他们都是被部署在不同主机上的。
    简单模式
    简单模式,就像它的名字一样很简单。我们只需要两个程序:一个代表Producer,它发送单个消息。另一个代表Consumer,它接受这个消息并且把它打印到控制台。官方使用字符串“Hello World”进行传递。

    简单模式
    这里会用官方提供的Java代码给出示例,在官方的基础上我会加一点注释,属于我个人的一些理解,如有错误还请大家指出来:

    POM.xml:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>cn.itcast.rabbitmq</groupId>
      <artifactId>itcast-rabbitmq</artifactId>
      <version>0.0.1-SNAPSHOT</version>
    
      <dependencies>
        <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>3.4.1</version>
        </dependency>
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>1.7.7</version>
        </dependency>
        <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.3.2</version>
        </dependency>
    
        <dependency>
          <groupId>org.springframework.amqp</groupId>
          <artifactId>spring-rabbit</artifactId>
          <version>1.4.0.RELEASE</version>
        </dependency>
    
      </dependencies>
    </project>
    

    Sending程序:

    Sending程序
    public class Send {
      //先给队列起名
      private final static String QUEUE_NAME = "hello";
    
      public static void main(String[] argv)throws IOException {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址,这里我的服务是在本地
        factory.setHost("localhost");
        //也可以设置账号信息,比如用户名、密码、virtualHost 这些都可以在management上自己添加、修改。
        //如果不设置用户名、密码和vshot,则自动使用默认的guest。这里和官方相同用guest的
        //factory.setVirtualHost("/{这里填vhost}");
        //factory.setUsername("{这里是账户}");
        //factory.setPassword("{这里是密码}");
        //用连接工厂创建一个连接
        Connection connection = factory.newConnection();
        //利用这个连接,创建一个通道
        Channel channel = connection.createChannel();
        //用这个通道声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //我们要传的消息
        String message = "Hello World!";
        //传送
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        //关闭连接和通道
        channel.close();
        connection.close();
      }
    }    
    

    Receiving程序:

    public class Recv {
      //队列的名字
      private final static String QUEUE_NAME = "hello";
    
      public static void main(String[] argv) throws IOException,InterruptedException {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("localhost");
            //...
            //用连接工厂获取连接
            Connection connection = factory.newConnection();
            //利用这个连接,获取通道
            Channel channel = connection.createChannel();
            //声明(创建)队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //打印,证明我们的接受程序打开了
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            //!!!注意!!!从这里接收消息对于Consumer。写的和官方不一样了,官方的在下面单写一下。
            Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body)throws IOException {
              String message = new String(body, "UTF-8");
              System.out.println(" [x] Received '" + message + "'");
            }
          };
          //接收
          channel.basicConsume(QUEUE_NAME, true, consumer);
       }
    }
    

    注意,我们在Receiving程序中也声明了队列,原因就是我们可能会在生产者工作之前启动消费者,所以我们希望能在使用生产者之前确保队列是确实存在的!

    官方使用的是DefaultConsumer,我们也可以使用QueueingConsumer:

            //定义队列的消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //监听队列
            channel.basicConsume(QUEUE_NAME,true,consumer);
            //获取消息
            while (true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("[x] Receive'"+message+"'");
            }
    

    这里翻译下官方对于DefaultConsumer的解释:

    DefaultConsumer类实现了Consumer接口,它的作用是将服务器向我们推送的消息进行缓冲。当我们向服务器要求将消息从队列传递给我们,由于推送消息的方式是异步的,所以我们提供一个对象进行回调,它的工作就是对消息进行缓冲,直到我们准备好去使用这些消息的时候为止。我们保持消费者开启的状态,每当生产者发送消息,handleDelivery方法就会回调。

    再说一下QueueingConsumer。QueueingConsumer继承了DefaultConsumer。相比较于DefaultConsumer,它使用起来很方便,它能省去重写handleDelivery的步骤,但官方使用DeafultConsumer就代表比较支持DefaultConsumer这种方式。

    QueueingConsumer有他的弊端:

    QueueingConsumer内部其实是一个LinkBlockingQueue,它将从broker端接受到的信息先暂存到这个LinkBlockingQueue中,然后消费端程序在从这个LinkBlockingQueue中take出消息。试下一下,如果我们不take消息或者说take的非常慢,那么LinkBlockingQueue中的消息就会越来越多,最终会造成内存溢出。

    QueueingConsumer在RabbitMQ流行于3.x版本,但是4.x版本中就Deprecated了,简单说就是不支持使用。
    StackOverflow上有篇文章详细的将两者进行了对比
    [DefaultConsumer vs QueueingConsumer]https://stackoverflow.com/questions/22840247/rabbitmq-java-client-using-defaultconsumer-vs-queueingconsumer/22859778
    所以可以根据具体的情景选择如何取舍~~~

    相关文章

      网友评论

        本文标题:[用官方文档学习RabbitMQ]——1.RabbitMQ的简单

        本文链接:https://www.haomeiwen.com/subject/fzvthxtx.html