原文链接:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
简介
RabbitMQ是一个消息中间件:负责接收消息以及转发消息。可以将RabbitMQ想象为一个邮局:当你将想要寄的信投入邮箱时,你确信邮递员最终会将信派发给你的收件人。大同小异,RabbitMQ就像是邮箱、邮局、邮递员。
RabbitMQ与邮局最大的不同就是,RabbitMQ不处理纸张,而是接收、存储、转发二进制消息数据。
RabbitMQ及消息传递通用的一些术语:
- Producing指的是发送消息,用于发送消息的程序被叫做Producer
- Queue指的是RabbitMQ中的邮箱。尽管消息会流经RabbitMQ和应用,但消息只能被存储在Queue中。Queue只受主机内存和硬盘的限制,它实际上是一个大的消息缓存。多个Producer可以向同一个Queue发送消息,多个Consumer同样可以从同一个Queue中接收消息。
- Consuming指的是接收消息,主要用来接收消息的程序被被叫做Consumer。
注意:Producer、Consumer、Broker可以部署在不同的主机上;同一个应用程序既可以是生产者也可以是消费者。
HelloWorld(Java版)
这里我们会写两个Java程序:一个用于发送消息的Producer,一个用于接收并打印消息的Consumer。我们不会关注JavaAPI的细节,而是专注于"HelloWorld"这个简单程序。
下图中,"P"表示Producer,"C"表示Consumer,中间的方格表示Queue。
Java客户端依赖包
RabbitMQ支持多种协议。我们这里采用AMQP协议(一个用于消息传递的开放通用协议)。RabbitMQ提供了多种语言的客户端,这里我们使用RabbitMQ提供的Java客户端。
下载客户端依赖包, 然后将依赖包添加到你的程序文件下。该客户端会依赖(SLF4J API 和 SLF4J Simple).
需要注意的是,SLF4J Simple对于本教程来说已经足够了,但是您应该在生产环境中使用像Logback这样成熟的日志库。
RabbitMQ的Java客户端依赖包也收录在Maven的中央仓库中,也可以用下边的方式引入该依赖包。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency>
发送消息
我们将消息的Producer的类命名为Send,消息的Consumer的类命名为Recv。Producer会连接到RabbitMQ并发送一条消息,然后退出。
在Send.java中,需要引入的下面的依赖类:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
设置Java类并声明队列名:
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
...
}
}
创建与RabbitMQ的连接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
}
这里的Connection是一个抽象的Socket Connection,主要解决协议版本处理、授权等问题。我们连接到的是本地RabbitMQ节点。如果想连接其他的节点,只需要指定主机名或IP地址即可。
然后我们创建了一个channel,我们用到的大部分API都在channel中。由于Connection和Channel都实现了Closeable接口,所以我们可以使用try-catch-with的写法,这样就不用再显式关闭资源。
发送消息前,我们需要先声明一个消息队列;然后再将消息发布到消息队列中。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
发送和接收消息时定义的消息队列名必须保持一致。如果队列不存在,则创建新的队列。消息体是一个byte数组,所以你可以编码任何类型的内容。
Send.java完整代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (Exception e) {
e.printStackTrace();
}
}
}
Receiving
Consumer监听来自RabbitMQ的消息。与上边仅仅发送单条消息的Producer不同,Consumer将一直运行以此来监听并打印消息。
Recv与Send的代码相似:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
我们使用一个特殊的接口DeliverCallback来缓存服务端推送过来的消息。
设置方法与Producer类似:打开一个Connection、Channel,然后声明将要消费的queue。需要注意的是这里的queue需要与Producer中声明的queue保持一致。
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
}
注意我们这里也声明了queue。那是因为我们可能会在启动Producer之前就先启动Consumer,这样我们可以确保从queue中消费消息时该queue已经存在。
为什么这里我们没有用try-catch-with的方式来自动关闭channel和connection呢?如果采用这种方式的话就可以让程序简单的运行、关闭资源然后退出。但真这样的做的话就会很尴尬,因为我们希望的是Consumer异步监听消息到来时进程一直保持活跃。
我们将告诉服务端把Queu中的消息传给我们。由于服务端会异步将消息推送给我们,所以我们需要提供一个对象形式的回调,该对象会将消息缓存起来直到我们使用消息。这就是DeliverCallback的作用。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
Recv.java完整代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Receiver {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
将Send和Recv一块运行
在类路径下引用RabbiMQ的客户端依赖包编译这两个Java文件:
javac -cp amqp-client-5.7.1.jar Send.java Recv.java
运行程序的时候需要应用类路径下的rabbitmq-client.jar 、slf4j-api-1.7.26.jar、slf4j-simple-1.7.26.jar ,在终端运行Consumer:
java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Recv
之后再运行Producer:
java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Send
在WIndows系统中运行的话,需要将冒号『:』替换为分号『;』
当程序全部运行的时候,Consumer会把从RabbitMQ中获取到的Producer的消息打印出来。Consumer将会一直等待消息。
Queue清单
你或许希望看到RabbitMQ都有哪些queue以及queue中有多少消息。你可以 使用rabbitmqctl来查看这些信息:
sudo rabbitmqctl list_queues
在Windows中对应的命令:
rabbitmqctl.bat list_queues
在第二章中,我们将创建一个简单的工作队列。
提示
为了方便输入,你可以给用到的依赖包设置一个环境变量,比如:export CP=.:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar java -cp $CP Send
Windows系统的话:
set CP=.;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar java -cp %CP% Send
网友评论