原文链接:https://www.rabbitmq.com/tutorials/tutorial-three-java.html
在上一章教程中,我们创建了一个工作队列。之前的工作队列采用的策略是将每条任务消息都直接分发到一个特定的worker。本章内容将对此作出一些改变——我们会将消息分发给多个consumer。这种模式被叫做“发布/订阅”。
为了说明这种模式,我们会创建一个简单的日志系统,这个系统包含两部分的程序:一部分用于发布日志消息,另一部分用于接收并打印日志消息。
在这个日志系统中,每一个运行的日志收集方都会收到日志消息。这样我们就可以用一个Receiver直接将日志日志写入磁盘,与此同时我们还可以用两个Receiver将日志打印到屏幕上。
交换机
在之前的教程中,我们都是直接通过队列来发送和接收消息。现在来介绍下RabbitMQ中的完整消息模型。
首先我们来快速回顾下之前的内容:
- Producer就是一个用来发送消息的应用。
- Queue是存放消息的一个缓存。
- Consumer是一个接收消息的用户应用。
RabbitMQ消息模型的核心思想就是Producer不会直接将消息发送给队列。事实上,多数情况下Producer甚至都不知道消息会被分派到队列中。
Producer只能将消息发送到交换机中。交换机其实很简单,它一边接收Producer发来的消息,另一边将消息推送给队列。交换机必须明确定义如何处理接收到的消息。是应该把消息推送到一个特定的队列?还是应该把消息推送给多个队列?亦或者是应该忽略消息?这些规则都通过交换机类型来进行定义:
image.png这是一些可用的交换机类型:direct、topic、headers以及fanout。我们本章要关注的是最后一种类型——fanout。首先我们来创建一个fanout类型的交换机并将其命名为logs。
channel.exchangeDeclare("logs", "fanout");
fanout类型的交换机其实很简单,顾名思义,它会将接收到的消息广播给所有已知的队列。而这正是我们的日志收集器所需要的。
交换机清单
可以使用rabbitmqctl工具来罗列所有的交换机:
sudo rabbitmqctl list_exchanges
你会看到一些以amq.开头的交换机,这些交换机都是RabbitMQ默认创建的,但当前基本上不会用到。
无名交换机
之前的章节中我们并没有涉及到交换机,但实际上我们能将消息发送给队列是因为我们使用了默认的交换机,我们就是通过空字符串""定义的默认交换机。
回顾一下我们之前发送消息的部分
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数就是交换机的名称,空字符串表示的就是默认交换机或者说是无名交换机:如果routingKey存在的话就将消息推送到匹配的队列中。
现在我们可以给我们的已命名的交换机发送消息了:
channel.basicPublish( "logs", "", null, message.getBytes());
临时队列
你或许还记得我们之前使用的都是有特定名称的队列。能够命名队列对我们来说很重要,我们需要将worker指向同一个队列。当我们需要在producer和consumer中共享信息是命名队列就显得尤为关键。
但是对于我们当前的日志系统来说,这就没那么要紧了。我们需要监听所有的日志消息,而不是其中某一部分。而且我们关心的仅仅是当前最新的消息。为了解决这些问题我需要做两件事:
首先,无论何时连接到RabbitMQ,我们都需要一个全新的空队列。为此我们创建队列的时候需要随机进行命名最好是服务器给我们随机命名一个队列。
其次,一旦我们将Consumer断开链接,对应的队列应该自动被删除。
我们可以使用无参构造函数queueDeclare()来创建具有非持久化、唯一、自动删除特性的命名队列。
String queueName = channel.queueDeclare().getQueue();
想要了解exclusive标识或者queue参数的话,可以查看guide on queues
参数queueName是一个随机的队列名,有点类似于amq.gen-JzTY20BRgKO-HjmUJj0wLg。
绑定
image.png我们已经创建了一个fanout交换机和一个队列。现在我们需要做的就是告诉交换机将消息推送给我们的队列。交换机和队列之间的这种关系就叫做绑定。
channel.queueBind(queueName, "logs", "");
现在logs这个交换机就会将消息推送给我们的队列了。
绑定列表
可以通过下边的命令来罗列现有的绑定:
rabbitmqctl list_bindings
代码合并
image.png这个发送日志消息的producer和我们之前教程中的看起来没什么不同。最大的变化就是现在producer不再把消息直接发送给队列而是发送给交换机。发送消息的时候我们需要提供一个routingKey,但是fanout交换机会忽略这个参数值。下面是我们的EmitLog.java的代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = argv.length < 1 ? "info: Hello World!" :
String.join(" ", argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
和你想的一样,创建完连接后我们要声明交换机。这一步是必须的,因为我们无法给一个不存在的交换机推送消息。
如果交换机没有绑定任何队列,消息就会被丢弃,不过这对我们来说没什么问题;如果没有consumer监听消息,我们就可以安全的丢弃该消息。
ReceiveLogs.java代码如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
网友评论