美文网首页
Apache Pulsar 之 Failover 调度策略

Apache Pulsar 之 Failover 调度策略

作者: wolf4j | 来源:发表于2019-05-21 19:36 被阅读0次

直接切入正题,为了统一 Queue 和 Stream 这两种类型的 MQ,pulsar 在 consumer 之前抽象了一层订阅层来处理这个事情。截至目前版本 2.3.1,pulsar 支持三种类型的订阅模型,其中 Exclusive 和 Failover 属于 Stream 类型的,Shared 属于 Queue 类型。

在上一篇文章 Pulsar 与 Kafka 消费模型对比 中,对比了 Pulsar 与 Kafka 在 Failover 场景下,二者之间的逻辑处理。那么在 Failover 场景下,Pulsar 是如何进行主从切换,并调度 consumer 来进行工作的呢?这篇文章,深入 Pulsar 的源码级别为大家揭开 Failover 的神秘面纱。

在开始分析之前,大致梳理一下 Pulsar 的代码结构,pulsar 整体分为 client 和 server 两大块,client 接口相关的代码在 pulsar-client-api 下,具体的实现在 pulsar-client 下。server 端的代码主要在 pulsar-broker 下。client 和 server 分别有一个主要的实现类:ClientCnxServerCnx。client 与 server 之间的交互采用的是 protobuf 的协议,具体内容在 pulsarApi.proto 文件中,二者公用的代码在 pulsar-common 下。

整个 Failover 的调度策略主要由 pulsar 的 broker 完成,当 client 接收到 broker 发送过来状态改变的信息时,做出相应的处理。

client

pulsar-client-api 下,定义了一个 ConsumerEventListener 的接口,主要用来监听 consumer 的状态信息。它有两个方法 becameActivebecameInactive,也就是当接收到状态改变时,client 根据状态信息,将当前的 consumer 置为相应的状态,即该 consumer 是否继续对外提供服务。

public interface ConsumerEventListener extends Serializable {
    void becameActive(Consumer<?> consumer, int partitionId);
    void becameInactive(Consumer<?> consumer, int partitionId);
}

client 端处理的逻辑主要在 ClientCnx 中的 handleActiveConsumerChange(),它首先会从 server 端获取当前 consumer 的状态信息,拿到状态改变的指令时,通过activeConsumerChanged() 来判定,当前 consumer 是否还可以继续对外提供服务。

handleActiveConsumerChange :

protected void handleActiveConsumerChange(CommandActiveConsumerChange change) {
        ConsumerImpl<?> consumer = consumers.get(change.getConsumerId());
        if (consumer != null) {
            consumer.activeConsumerChanged(change.getIsActive());
        }
    }

activeConsumerChanged:

void activeConsumerChanged(boolean isActive) {
        if (consumerEventListener == null) {
            return;
        }

        listenerExecutor.execute(() -> {
            if (isActive) {
                consumerEventListener.becameActive(this, partitionIndex);
            } else {
                consumerEventListener.becameInactive(this, partitionIndex);
            }
        });
    }

proto API

client 与 server 之间的交互信息定义在 CommandActiveConsumerChange 中,具体如下:

// changes on active consumer
message CommandActiveConsumerChange {
        required uint64 consumer_id = 1;
        optional bool is_active     = 2 [default = false];
}

server

broker 的相关实现先从 Topic 提供的 subscribe 接口看起,因为当 server 端接收到一个订阅信息时,会先从这里做出相应的处理。subscribe 分别被 PersistentTopicNonPersistentTopic 实现,我们以 PersistentTopic 为例。

  1. PersistentTopic 场景下,当接收到一个订阅信息时,它会做一系列的判断(这个不是本文的重点,在这里忽略),当所有的判断成功之后,说明当前的订阅请求符合创建一个 consumer 的逻辑,broker 会根据用户提供的信息,创建一个 consumer,并调用 subscription.addConsumer(consumer) 将当前创建的 consumer 加入进来。
subscriptionFuture.thenAccept(subscription -> {
            try {
                ledger.checkBackloggedCursors();
                Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
                                                 maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition);
                subscription.addConsumer(consumer);//划重点

当有新的 consumer 加入之后,broker 首先会去更新 cursor 的状态信息,更新 cursor 的信息之后会去判断又是一系列的判断。

注意:更新 cursor 的状态信息需要是线程安全的操作,所以在这里,程序会去判断,在当前更新 cursor 之后,是否有别的线程更新过 cursor 信息,如果更新过,IS_FENCED_UPDATER.get(this) == TRUE,代码如下:

cursor.updateLastActive();
if (IS_FENCED_UPDATER.get(this) == TRUE) {
    log.warn("Attempting to add consumer {} on a fenced subscription", consumer);
    throw new SubscriptionFencedException("Subscription is fenced");
}
  1. subscription.addConsumer(consumer) 会去调用 dispatcher.addConsumer(consumer)

dispatcher.addConsumer(consumer) 是一个接口,总共有 5 个类实现了该接口:

image.png

先来看 AbstractDispatcherSingleActiveConsumer 的逻辑,又是一系列的判断操作:

  1. 判断当前的 Dispatcher 是否已经被关闭,关闭的话,将当前的 consumer 与 broker 断开连接;
  2. 如果订阅类型为 Exclusive 且 当前 consumer 列表不为空的话,说明当前已经有 consumer 在订阅这个 topic,在 Exclusive 场景下,同一时刻只能有一个 consumer 订阅该 topic,所以这里会直接抛出 ConsumerBusyException 异常。
  3. 判断当前的 consumer 是否达到了用户设置的最大 consumer 数,达到的话,会直接抛出 ConsumerBusyException 异常。
  4. 如果订阅类型为 Failover 且达到了 consumer 的最大数,会直接抛出 ConsumerBusyException 异常。

如果上述条件都不满足,说明该 consumer 可以加入到当前的 consumers 列表中。截至目前,用户想要添加的 conusmer 才被添加进来,当收到用户要添加的 consumer 之后,会去判断 pickAndScheduleActiveConsumer, 如果不满足,通过 ACTIVE_CONSUMER_UPDATER 获取当前处于活跃状态的 consumer 信息,如果当前活跃的 consumer 为 null,退出。否则,调用 consumer 的 notifyActiveConsumerChange 进行通知,它会调用 Commands.newActiveConsumerChange,这个函数是真正与 client 端建立连接,通知 conusmer 状态的信息。

        consumers.add(consumer);

        if (!pickAndScheduleActiveConsumer()) {
            // the active consumer is not changed
            Consumer currentActiveConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
            if (null == currentActiveConsumer) {
                if (log.isDebugEnabled()) {
                    log.debug("Current active consumer disappears while adding consumer {}", consumer);
                }
            } else {
                consumer.notifyActiveConsumerChange(currentActiveConsumer);
            }
        }

相关文章

网友评论

      本文标题:Apache Pulsar 之 Failover 调度策略

      本文链接:https://www.haomeiwen.com/subject/sweeaqtx.html