Federation 插件的目标是使 RabbitMQ 在不同的 Broker 节点之间进行消息传递而无须建立集群,该功能在很多场景下都非常有用:
-
Federation 插件能够在不同管理域(可能设置了不同的用户和 vhost ,也可能运行在不同版本的 RabbitMQ 和Erlang 上)中的Broker 或者集群之间传递消息。
-
Federation 插件基于AMQP 0-9-1 协议在不同的Broker 之间进行通信,并设计成能够容忍不稳定的网络连接情况。
-
一个Broker 节点中可以同时存在联邦交换器(或队列)或者本地交换器(或队列),只需要对特定的交换器(或队列)创建Federation 连接CFederation link ) 。
-
Federation 不需要在 N 个Broker 节点之间创建O(N2)个连接(尽管这是最简单的使用方式) ,这也就意味着Federation 在使用时更容易扩展。
一个联邦交换器(federated exchange)或者一个联邦队列(federated queue) 接收上游(upstream) 的消息,这里的上游是指位于其他 Broker 上的交换器或者队列。联邦交换器能够将原本发送给上游交换器(upstream exchange)的消息路由到本地的某个队列中;联邦队列则允许一个本地消费者接收到来自上游
队列C(upstream queue)的消息。
联邦交换器
联邦交换器.PNG场景描述:broker1 部署在北京,broker2 部署在上海,broker3 部署在广州。
在 broker3 中为交换器 exchangeA (broker3 中的队列 queueA 通过 "rkA" 与 exchangeA 进行了绑定)与北京的 broker1 之间建立一条单向的 Federation link。此时 Federation 插件会在 broker1 上会建立一个同名的交换器 xchangeA (这个名称可以配置,默认同名),同时建立一个内部的交换器 "exchangeA→ broker3 B ",并通过路由键 "rkA" 将这两个交换器绑定起来。
部署在北京的业务 ClientB 可以连接 broker1 并向 exchangeA 发送消息,这样 ClientB 可以迅速发完消息并收到确认消息,而之后消息会通过 Federation link 转发到 broker3 的交换器 exchangA 中。最终消息会存入与 exchangeA 绑定的队列 queueA 中,消费者可以消费队列 queueA 中的消息。
经过 Federation link 转发的消息会带有特殊的 headers 属性标记。
联邦队列
联邦队列可以在多个 Broker 节点(或集群)之间为单个队列提供负载均衡的功能。一个联邦队列可以连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求。
联邦队列.PNG上图队列 queue1 和 queue2 原本在 broker2 中,由于某种需求将其配置为 federated queue 并将broker1 作为 upstream。Federation 插件会在 broker1 上创建同名的队列 queue1 和 queue2 ,与 broker2 中的队列queue1和 queue2 分别建立两条单向独立的 Federation link。
此时的消费方式如下:
-
当有消费者 ClinetA 连接 broker2并通过 Basic.Consume 消费队列 queue1 (或queue2) 中的消息时,如果队列queue1 (或queue2)中本身有若干消息堆积,那么ClientA 直接消费这些消息,此时 broker2 中的queue1 (或queue2 ) 并不会拉取 broker1中的queue1 (或queue2 ) 的消息。
-
如果队列 queue1(或 queue2)中没有消息堆积或者消息被消费完了,那么它会通过 Federation link 拉取在 broker1 中的上游队列 queue1(或者 queue2)中的消息(如果有消息),然后存储到本地,之后再被消费者 ClientA 进行消费。
上图描述,如果队列 queue1 中有消息堆积,消费者连接 broker3 消费消息 queue3 中的消息,无论 queue3 处于何种状态,这些消费者都消费不到 queue1 中的消息,除非 queue2 有消费者。
上图中的 broker2 的队列 queue 没有消息堆积或者消息被消费完之后并不能通过 Basic.Get 来获取 broker1 中队列 queue 的消息。因为 Basic.Get 是一个异步的方法,如果要从 broker1 中队列 queue 拉取消息,必须要阻塞等待通过 Federation link 拉取消息存入 broker2 中的队列 queue 之后再消费消息,所以对于 federation queue 而言只能使用 Basic.Consume 进行消费。
federation queue 并不具备传递性。
Federation 的使用
为了能够使用 Federation 功能,需要配置以下 2 个内容:
(1)需要配置一个或多个 upstream ,每个 upstream 均定义了到其他节点的 Federation link。这个配置可以通过设置运行时的参数( Runtime Parameter ) 来完成,也可以通过 federation management 插件来完成。
(2)需要定义匹配交换器或者队列的一种/多种策略(Policy)。
rabbitmq-plugins enable rabbitmq_federation
命令可以开启 Federation 功能。Federation 内部基 AMQP 协议拉取数据,所以在开启 rabbitmq federation 插件的时候,默认会开启 amqp_client 插件。
rabbitmq-plugins enable rabbitmq_federation_management
开启 Federation 的管理插件。开启成功后可以在 RabbitMQ 的管理界面中 "Admin" 中看到关于 Federation 的 Tab 页。
rabbitmq_federation_management 插件依附于 rabbitmq_management 插件,所以开启rabbitmq_federation_management 插件的同时默认也会开启rabbitmq_management 插件。
注意:当需要在集群中使用 Federation 功能的时候,集群中所以的节点都应该开启 Federation 插件。
在 Federation 中存在 3 种级别的配置。
(1)Upstreams:每个 upstream 用于定义与其他 Broker 建立连接的信息。
(2)Upstreams Sets:每个 upstream set 用于对一系列使用 Federation 功能的 upstream 进行分组。
(3)Policies:每一个 Policy 会选定出一组交换器,或者队列,亦或者两者皆有而进行限定,进而作用于一个单独的 upstream 或者 upstream set 之上。
建立 federated exchange 示例:
(1)需要在上下游的 broker 中开启 rabbitmq federation 插件,最好同时开启 rabbitmq federation managemen 插件。
(2)在 broker 中定义一个 upstream (注意:该 broker 作为下游接收消息)。
add_federation_upstream.PNG-
Name:定义这个 upstream 的名称。
-
URI:定义 upstream 的 AMQP 连接,如
amqp://root:root123@192.168.0.2:5672
-
Prefetch count:定义 Federation 内部缓存的消息条数,即在收到上游消息之后且在发送到下游之前缓存的消息条数。
-
Reconnect delay
-
Acknowledgement Mode:定义 Federation link 的消息确认方式。
-
Trust User-ID:设定 Federation 是否使用 “Validated User-ID” 这个功能。
(3)定义一个 Policy 用于匹配交换器,并使用第二步中所创建的 upstream。
add_federation_policy.PNG(4)效果图观察
- 上游
标注的 exchange 自动创建。
- 下游
当消息发送到上游的交换器后,上游交换器接收消息并转发到下游交换器中。
测试代码:
- RabbitMQSender
public class RabbitMQSender {
private final static String EXCHANGE_NAME = "federation.exchange1";
private final static String DUHFMQ02 = "10.225.20.237";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setUsername("jiaflu");
factory.setPassword("123456");
factory.setVirtualHost("/vhost_jiaflu");
// 创建连接
try {
Address[] address = new Address[]{new Address(DUHFMQ02)};
Connection connection = factory.newConnection(address);
// 创建信息管道
Channel channel = connection.createChannel();
// exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, false, null);
String message;
for (int i = 0; i < 10; i++) {
message = System.currentTimeMillis() + " hello " + i;
System.out.println(i + " send " + message);
channel.basicPublish( EXCHANGE_NAME, "federation.exchange.test", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Thread.sleep(30);
}
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e){
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- RabbitMQReceiver
public class RabbitMQReceiver {
private final static String QUEUE_NAME = "federation.exchange.queue1";
private final static String EXCHANGE_NAME = "federation.exchange1";
private final static String DUHFMQ = "10.224.162.189";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setPort(5672);
factory.setUsername("jiaflu");
factory.setPassword("123456");
factory.setVirtualHost("/vhost_jiaflu");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(2);
// 创建连接
try {
Address[] address = new Address[]{new Address(DUHFMQ)};
Connection connection = factory.newConnection(address);
// 创建信息管道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, false, null);
// bind
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "federation.exchange.test");
System.out.println("Queue Receiver Start!");
// 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
// 消息到达 触发方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("Recv msg: " + msg);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
boolean autoAck = true;
while (true) {
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
break;
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e){
e.printStackTrace();
}
}
}
小结
本来由于公司需要需要在两个 clutser 之间通过 Federation 插件来实现 HA(高可用),采取了双向的 federation exchange 来实现两个 cluster 之间的消息复制,实现了消息的同步复制,但没有实现两个 cluster 之间消息消费的同步。
网友评论