美文网首页
RabbitMQ翻译 Part1

RabbitMQ翻译 Part1

作者: 野餐先生 | 来源:发表于2017-06-01 17:54 被阅读40次

    介绍

    RabbitMQ是一种消息中间件。它的核心思想很简单,接收并传递消息。你可以把RabbitMQ想象成邮局:当你把信扔进信箱后,你十分确信邮递员会准确的把信交给收件人。在这个比喻里,RabbitMQ就是邮箱、邮局和邮递员的集合。
    RabbitMQ和邮局主要的区别在于,RabbitMQ处理的不是纸质邮件,而是二进制的数据(Messages)

    接下来用较为专业的术语解释RabbitMQ以及消息传递。

    Producing指的是只做发送操作,其余什么也不干。自然而然,Producer就是只发消息的程序。我们用P指代Producer。

    image

    Queue等同于邮箱的意思,它存在于RabbitMQ当中。当消息穿过RabbitMQ到达你的应用程序期间,它们全部都保存在Queue当中。Queue的大小没有限制,你想存多少就存多少-它基本等同于一个容量无限的缓存。大量Producer往里发送消息,大量Consumer从同一个队列里取消息。我们用个图来展示下。

    image

    Consuming与接收的含义非常接近。Consumer这类程序主要功能就是接收消息。我们画个C。

    image

    注意Producer,Consumer以及Broker可能不在同一台主机中;实际大多数情况下,它们都分布在不同的主机中。

    "Hello World"

    Using the Java Client

    在这部分内容中,我们会写两个JAVA程序;
    第一个如下:
    Producer发送一条消息,Consumer接收消息并打印。让我们暂时忽视JAVA API的实现细节,集中注意力从简单的调用开始,发一条“Hello World”的消息。
    在下方的图中,“P”代表Producer,“C”代表Consumer,中间的红色方块代表Queue(Rabbit为Consumer持有的消息缓存)

    image

    Java版RabbitMQ客户端

    RabbitMQ使用多种协议。在本部分示例中,采用的是AMQP 0-9-1协议,它是一种开放的、多功能的消息传递协议。同时,RabbitMQ客户端的实现语言也种类繁多,在这里我们选用JAVA版本。
    下载安装包,检查签名,解压到你指定的路径 巴拉巴拉~~~
    现在我们开始写代码。

    Sending

    image

    我们称消息发送者为Send,接收者为Recv。Send会连接RabbitMQ,发送一条简短的消息,然后退出。

    我们需要导入以下class文件

    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    

    建立类文件并且命名队列。

    public class Send {
      private final static String QUEUE_NAME = "hello";
    
      public static void main(String[] argv)
          throws java.io.IOException {
          ...
      }
    }
    

    然后我们创建一个到服务器的连接:

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    

    Connection掩盖了套接字实现的细节,让我们能专注于选择协议版本和认证以及其他重要的事情上。我们连接本机的broker,下文中我们简称为localhost。我们只需要修改IP地址就能简单的连上其他主机上的broker。

    接下来我们创建一个channel,它包含大量我们常用的API。

    为了发送消息,我们需要声明一个队列;然后我们向队列中发送消息。

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
    

    声明队列是幂等的,只有当队列不存在时,它才会被创建(吐槽,不就是单例嘛。。。)。消息内容是字节数组,你可以随心所以编码它。

    最后,别忘记关掉连接。

        channel.close();
        connection.close();
    

    以上就是整个Send.java的内容。

    发送无效怎么办?
    如果你第一次使用RabbitMQ并且没有看见发送的消息,你肯定会对这种不知所措的感觉印象深刻。也许是broker没有足够的磁盘空间(默认需要1G)导致拒绝接收消息。通过检查broker的日志文件来判断是否需要降低磁盘需求。 configuration file documentation会教你怎样设置disk_free_limit。

    Receving

    接下来是reciver,它被RabbitMQ塞入消息。同时,reciver实现起来也比sender复杂,我们需要它监听消息,直到接收并打印出来。

    image

    像写sender一样,这里我们也需要引入很多依赖的代码。

    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    

    引入的代码中,DefaultConsumer是一个实现了Consumer接口的类,我们会用它来保存RabbitMQ推送的消息。
    类似Sender一样构建代码;我们打开Connection和Channel,声明将要消费的队列。注意队列名称需要与发送的队列相同。

    public class Recv {
      private final static String QUEUE_NAME = "hello";
    
      public static void main(String[] argv)
          throws java.io.IOException,
                 java.lang.InterruptedException {
    
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        ...
        }
    }
    

    注意,我们在这里也许要声明队列,是因为我们也许在启动sender前就启动了reciver。我们需要确保在消费消息前,队列已经存在。
    接下来我们要告诉RabbitMQ,让它把队列中的消息发给我们。由于它通过异步的方式推送消息,我们在形式上先用一个变量保存消息直到我们实际使用它。这也是DefaultConsumer子类所做的工作。

        Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
              throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
          }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    

    以上就是整个Recv.java的代码。

    Putting it all together

    你可以将这两段代码同rabbitMQ客户端代码一起编译。

    $ javac -cp rabbitmq-client.jar Send.java Recv.java
    

    为了运行他们,你需要rabbitmq-client.jar并且它依赖于classpath。在终端上,运行sender:

    $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send
    

    然后,运行receiver;

    $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv
    

    在Windows环境下,在classpath中用分号代替冒号分离它们
    当receiver从RabbitMQ中获取消息后会将其打印出来。recevier会一直运行并等待新的消息,因此我们在另一个终端中启动sender。

    如果你想检查队列,尝试使用 rabbitmqctl list_queues

    hello

    是时候看看Part2,构建一个简单的工作队列。

    小秘密:
    为了减少打字,你可以为classpath设置环境变量,例如

    $ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
     $ java -cp $CP Send
    

    windows环境下

     > set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar
     > java -cp %CP% Send
    

    相关文章

      网友评论

          本文标题:RabbitMQ翻译 Part1

          本文链接:https://www.haomeiwen.com/subject/prbefxtx.html