介绍
RabbitMQ是一个消息代理:它接收并转发信息。
术语
生产者Producter
发送消息的程序被称为生产者。
队列Queue
RabbitMQ内部类似于邮箱的部分称为队列。虽然消息流经RabbitMQ和你的应用程序,但是它们只能被存储于队列中。队列仅受主机的存储器和磁盘大小的限制,它本质上是一个大的消息缓冲器。许多生产者可以向同一个队列发送消息,许多消费者也可以尝试从同一个队列接收消息。
消费者Consumer
接收消息的程序被称为消费者。
producter,consumer和broker可以不再同一台主机上;
一个应用程序也可以同时是生产者和消费者。
Hello World教程(Java)
这一部分的教程会写两个Java程序:一个生产者,用来发送一条信息;一个消费者,用来接收信息并将其打印出来。这条信息就是“Hello World”。
Java Client 库
RabbitMQ有多种协议。本教程中使用的是 AMQP 0-9-1,它是一种开放的通用信息传递协议。RabbitMQ有许多不同语言的客户端,我们将使用其提供的Java客户端。
下载客户端库及其依赖(SLF4J API and SLF4J Simple)。将这些jar包及Java教程文件复制到你的工作目录中。
注意:对于本教程而言SLF4J Simple已经足够了,但是在生产过程中应该使用成熟的日志记录库,例如Logback。
RabbitMQ Java客户端也存在于Maven的中央仓库中:
其groupId为com.rabbitmq
,artifactId为amqp-client
现在有了Java client及其依赖项,可以开始编写代码了。其中消息发送者(即生产者)称为Send
;消息接收者(即消费者)称为Revc
。
发送
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Send.java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Send {
// 设置队列名
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
// 创建到服务器的连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
1.创建到服务器的连接。
connection抽象了socket connection,负责协议版本协商和认证等工作。这里我们连接到本地计算机上的代理--localhost。如果想连接其他计算机上的代理只需指定其host或者IP地址。
2.创建一个通道。
channel是用于完成任务的大多数API所在的位置。注意,此处使用的是try-with-resources语句。因为Connection
和Channel
都实现了java.io.Closeable
接口。这样我们就不用再代码中显示的关闭资源了。
3.在try-with-resources语句中将消息发送至队列。
声明队列是幂等的,只有当队列不存在时才会被创建。消息内容是一个字节数组,所以可以在里面编码任何内容。
发送未成功
如果这是你第一次使用RabbitMQ并且没有看见已发送的消息,那么您可能会挠头想知道哪里可能出错。
可能是代理在没有足够的磁盘空间下启动了(默认情况下,至少需要200MB的可用空间),因此拒绝接收消息。可以通过检查代理的日志文件来确认,并且如果有必要的话可以减少限制。配置文件文档会教你如何设置disk_free_limit
。
接收
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java
与生产者不同,生产者只是发送单独的一条消息,消费者需要一直监听RabbitMQ的消息,所以需要保证它一直运行监听消息并打印出来。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class Recv {
// 设置与生产者相同的队列名
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
1.打开一个连接和一个通道,声明消费消息的队列(与生产者设置的队列名相同)
在此处声明队列是因为可能消费者在生产者之前启动,
在尝试消费队列中的消息之前需要确保这个队列一定是存在的。
2.此处不使用try-with-resource语句自动关闭通道和链接:
因为我们希望在消费者异步侦听消息到达时,进程保持活动状态。
3.需要告诉服务器将队列中的消息传递给我们
由于它将异步的向我们推送消息,所以以对象的形式提供了一个回调,该回调会为我们缓冲消息,直到准备使用为止。上述就是DeliverCallback
子类所做的。
发送&接收
1.编译--可以仅使用classpath上的RabbitMQ Java Client来编译这两个类:
javac -cp amqp-client-5.7.1.jar Send.java Recv.java
2.运行--在classpath中需要rabbitmq-client.jar及其依赖。在终端中运行他们:
java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Recv
java -cp .:amqp-client-5.7.1.jar:slf4j-api-1.7.26.jar:slf4j-simple-1.7.26.jar Send
网友评论