美文网首页
(翻译)RabbitMQ官网教程(1)HelloWorld

(翻译)RabbitMQ官网教程(1)HelloWorld

作者: zhangyaqi | 来源:发表于2020-08-06 00:08 被阅读0次

    原文链接:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

    简介


    RabbitMQ是一个消息中间件:负责接收消息以及转发消息。可以将RabbitMQ想象为一个邮局:当你将想要寄的信投入邮箱时,你确信邮递员最终会将信派发给你的收件人。大同小异,RabbitMQ就像是邮箱、邮局、邮递员。

    RabbitMQ与邮局最大的不同就是,RabbitMQ不处理纸张,而是接收、存储、转发二进制消息数据。

    RabbitMQ及消息传递通用的一些术语:


    • Producing指的是发送消息,用于发送消息的程序被叫做Producer
    • Queue指的是RabbitMQ中的邮箱。尽管消息会流经RabbitMQ和应用,但消息只能被存储在Queue中。Queue只受主机内存和硬盘的限制,它实际上是一个大的消息缓存。多个Producer可以向同一个Queue发送消息,多个Consumer同样可以从同一个Queue中接收消息。
    • Consuming指的是接收消息,主要用来接收消息的程序被被叫做Consumer。

    注意:Producer、Consumer、Broker可以部署在不同的主机上;同一个应用程序既可以是生产者也可以是消费者。

    HelloWorld(Java版)


    这里我们会写两个Java程序:一个用于发送消息的Producer,一个用于接收并打印消息的Consumer。我们不会关注JavaAPI的细节,而是专注于"HelloWorld"这个简单程序。

    下图中,"P"表示Producer,"C"表示Consumer,中间的方格表示Queue。

    Java客户端依赖包

    RabbitMQ支持多种协议。我们这里采用AMQP协议(一个用于消息传递的开放通用协议)。RabbitMQ提供了多种语言的客户端,这里我们使用RabbitMQ提供的Java客户端。

    下载客户端依赖包, 然后将依赖包添加到你的程序文件下。该客户端会依赖(SLF4J APISLF4J Simple).

    需要注意的是,SLF4J Simple对于本教程来说已经足够了,但是您应该在生产环境中使用像Logback这样成熟的日志库。

    RabbitMQ的Java客户端依赖包也收录在Maven的中央仓库中,也可以用下边的方式引入该依赖包。

    <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>5.9.0</version>
    </dependency>
    

    发送消息


    我们将消息的Producer的类命名为Send,消息的Consumer的类命名为Recv。Producer会连接到RabbitMQ并发送一条消息,然后退出。

    在Send.java中,需要引入的下面的依赖类:

    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.Channel;
    

    设置Java类并声明队列名:

    public class Send {
      private final static String QUEUE_NAME = "hello";
      public static void main(String[] argv) throws Exception {
          ...
      }
    }
    

    创建与RabbitMQ的连接:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
    
    }
    

    这里的Connection是一个抽象的Socket Connection,主要解决协议版本处理、授权等问题。我们连接到的是本地RabbitMQ节点。如果想连接其他的节点,只需要指定主机名或IP地址即可。

    然后我们创建了一个channel,我们用到的大部分API都在channel中。由于Connection和Channel都实现了Closeable接口,所以我们可以使用try-catch-with的写法,这样就不用再显式关闭资源。

    发送消息前,我们需要先声明一个消息队列;然后再将消息发布到消息队列中。

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    

    发送和接收消息时定义的消息队列名必须保持一致。如果队列不存在,则创建新的队列。消息体是一个byte数组,所以你可以编码任何类型的内容。

    Send.java完整代码:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Send {
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
    
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                String message = "Hello World!";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    Receiving


    Consumer监听来自RabbitMQ的消息。与上边仅仅发送单条消息的Producer不同,Consumer将一直运行以此来监听并打印消息。

    Recv与Send的代码相似:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    

    我们使用一个特殊的接口DeliverCallback来缓存服务端推送过来的消息。

    设置方法与Producer类似:打开一个Connection、Channel,然后声明将要消费的queue。需要注意的是这里的queue需要与Producer中声明的queue保持一致。

    public class Recv {
    
      private final static String QUEUE_NAME = "hello";
    
      public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
      }
    }
    

    注意我们这里也声明了queue。那是因为我们可能会在启动Producer之前就先启动Consumer,这样我们可以确保从queue中消费消息时该queue已经存在。

    为什么这里我们没有用try-catch-with的方式来自动关闭channel和connection呢?如果采用这种方式的话就可以让程序简单的运行、关闭资源然后退出。但真这样的做的话就会很尴尬,因为我们希望的是Consumer异步监听消息到来时进程一直保持活跃。

    我们将告诉服务端把Queu中的消息传给我们。由于服务端会异步将消息推送给我们,所以我们需要提供一个对象形式的回调,该对象会将消息缓存起来直到我们使用消息。这就是DeliverCallback的作用。

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    

    Recv.java完整代码:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class Receiver {
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws Exception {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
            });
    
        }
    }
    

    将Send和Recv一块运行

    在类路径下引用RabbiMQ的客户端依赖包编译这两个Java文件:

    javac -cp amqp-client-5.7.1.jar Send.java Recv.java

    运行程序的时候需要应用类路径下的rabbitmq-client.jar 、slf4j-api-1.7.26.jar、slf4j-simple-1.7.26.jar ,在终端运行Consumer:

    java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Recv

    之后再运行Producer:

    java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Send

    在WIndows系统中运行的话,需要将冒号『:』替换为分号『;』

    当程序全部运行的时候,Consumer会把从RabbitMQ中获取到的Producer的消息打印出来。Consumer将会一直等待消息。

    Queue清单

    你或许希望看到RabbitMQ都有哪些queue以及queue中有多少消息。你可以 使用rabbitmqctl来查看这些信息:

     sudo rabbitmqctl list_queues
    

    在Windows中对应的命令:

    rabbitmqctl.bat list_queues
    

    在第二章中,我们将创建一个简单的工作队列。

    提示
    为了方便输入,你可以给用到的依赖包设置一个环境变量,比如:

     export CP=.:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar
     java -cp $CP Send
    

    Windows系统的话:

    set CP=.;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar
    java -cp %CP% Send
    

    相关文章

      网友评论

          本文标题:(翻译)RabbitMQ官网教程(1)HelloWorld

          本文链接:https://www.haomeiwen.com/subject/vdparktx.html