直接切入正题,为了统一 Queue 和 Stream 这两种类型的 MQ,pulsar 在 consumer 之前抽象了一层订阅层来处理这个事情。截至目前版本 2.3.1,pulsar 支持三种类型的订阅模型,其中 Exclusive 和 Failover 属于 Stream 类型的,Shared 属于 Queue 类型。
在上一篇文章 Pulsar 与 Kafka 消费模型对比 中,对比了 Pulsar 与 Kafka 在 Failover 场景下,二者之间的逻辑处理。那么在 Failover 场景下,Pulsar 是如何进行主从切换,并调度 consumer 来进行工作的呢?这篇文章,深入 Pulsar 的源码级别为大家揭开 Failover 的神秘面纱。
在开始分析之前,大致梳理一下 Pulsar 的代码结构,pulsar 整体分为 client 和 server 两大块,client 接口相关的代码在 pulsar-client-api
下,具体的实现在 pulsar-client
下。server 端的代码主要在 pulsar-broker
下。client 和 server 分别有一个主要的实现类:ClientCnx
和 ServerCnx
。client 与 server 之间的交互采用的是 protobuf 的协议,具体内容在 pulsarApi.proto
文件中,二者公用的代码在 pulsar-common
下。
整个 Failover 的调度策略主要由 pulsar 的 broker 完成,当 client 接收到 broker 发送过来状态改变的信息时,做出相应的处理。
client
在 pulsar-client-api
下,定义了一个 ConsumerEventListener
的接口,主要用来监听 consumer 的状态信息。它有两个方法 becameActive
和 becameInactive
,也就是当接收到状态改变时,client 根据状态信息,将当前的 consumer 置为相应的状态,即该 consumer 是否继续对外提供服务。
public interface ConsumerEventListener extends Serializable {
void becameActive(Consumer<?> consumer, int partitionId);
void becameInactive(Consumer<?> consumer, int partitionId);
}
client 端处理的逻辑主要在 ClientCnx
中的 handleActiveConsumerChange()
,它首先会从 server 端获取当前 consumer 的状态信息,拿到状态改变的指令时,通过activeConsumerChanged()
来判定,当前 consumer 是否还可以继续对外提供服务。
handleActiveConsumerChange :
protected void handleActiveConsumerChange(CommandActiveConsumerChange change) {
ConsumerImpl<?> consumer = consumers.get(change.getConsumerId());
if (consumer != null) {
consumer.activeConsumerChanged(change.getIsActive());
}
}
activeConsumerChanged:
void activeConsumerChanged(boolean isActive) {
if (consumerEventListener == null) {
return;
}
listenerExecutor.execute(() -> {
if (isActive) {
consumerEventListener.becameActive(this, partitionIndex);
} else {
consumerEventListener.becameInactive(this, partitionIndex);
}
});
}
proto API
client 与 server 之间的交互信息定义在 CommandActiveConsumerChange
中,具体如下:
// changes on active consumer
message CommandActiveConsumerChange {
required uint64 consumer_id = 1;
optional bool is_active = 2 [default = false];
}
server
broker 的相关实现先从 Topic
提供的 subscribe
接口看起,因为当 server 端接收到一个订阅信息时,会先从这里做出相应的处理。subscribe
分别被 PersistentTopic
和 NonPersistentTopic
实现,我们以 PersistentTopic
为例。
- 在
PersistentTopic
场景下,当接收到一个订阅信息时,它会做一系列的判断(这个不是本文的重点,在这里忽略),当所有的判断成功之后,说明当前的订阅请求符合创建一个 consumer 的逻辑,broker 会根据用户提供的信息,创建一个 consumer,并调用subscription.addConsumer(consumer)
将当前创建的 consumer 加入进来。
subscriptionFuture.thenAccept(subscription -> {
try {
ledger.checkBackloggedCursors();
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition);
subscription.addConsumer(consumer);//划重点
当有新的 consumer 加入之后,broker 首先会去更新 cursor 的状态信息,更新 cursor 的信息之后会去判断又是一系列的判断。
注意:更新 cursor 的状态信息需要是线程安全的操作,所以在这里,程序会去判断,在当前更新 cursor 之后,是否有别的线程更新过 cursor 信息,如果更新过,IS_FENCED_UPDATER.get(this) == TRUE
,代码如下:
cursor.updateLastActive();
if (IS_FENCED_UPDATER.get(this) == TRUE) {
log.warn("Attempting to add consumer {} on a fenced subscription", consumer);
throw new SubscriptionFencedException("Subscription is fenced");
}
-
subscription.addConsumer(consumer)
会去调用dispatcher.addConsumer(consumer)
dispatcher.addConsumer(consumer)
是一个接口,总共有 5 个类实现了该接口:
先来看 AbstractDispatcherSingleActiveConsumer
的逻辑,又是一系列的判断操作:
- 判断当前的 Dispatcher 是否已经被关闭,关闭的话,将当前的 consumer 与 broker 断开连接;
- 如果订阅类型为 Exclusive 且 当前 consumer 列表不为空的话,说明当前已经有 consumer 在订阅这个 topic,在 Exclusive 场景下,同一时刻只能有一个 consumer 订阅该 topic,所以这里会直接抛出
ConsumerBusyException
异常。 - 判断当前的 consumer 是否达到了用户设置的最大 consumer 数,达到的话,会直接抛出
ConsumerBusyException
异常。 - 如果订阅类型为 Failover 且达到了 consumer 的最大数,会直接抛出
ConsumerBusyException
异常。
如果上述条件都不满足,说明该 consumer 可以加入到当前的 consumers 列表中。截至目前,用户想要添加的 conusmer 才被添加进来,当收到用户要添加的 consumer 之后,会去判断 pickAndScheduleActiveConsumer
, 如果不满足,通过 ACTIVE_CONSUMER_UPDATER
获取当前处于活跃状态的 consumer 信息,如果当前活跃的 consumer 为 null,退出。否则,调用 consumer 的 notifyActiveConsumerChange
进行通知,它会调用 Commands.newActiveConsumerChange
,这个函数是真正与 client 端建立连接,通知 conusmer 状态的信息。
consumers.add(consumer);
if (!pickAndScheduleActiveConsumer()) {
// the active consumer is not changed
Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (null == currentActiveConsumer) {
if (log.isDebugEnabled()) {
log.debug("Current active consumer disappears while adding consumer {}", consumer);
}
} else {
consumer.notifyActiveConsumerChange(currentActiveConsumer);
}
}
网友评论