美文网首页
Pulsar consumer seek() 源码梳理

Pulsar consumer seek() 源码梳理

作者: wolf4j | 来源:发表于2019-06-18 15:56 被阅读0次

    seek

    目前 Pulsar 中 consumer 的 seek 接口主要支持了两种:

    • messageID
    • publishTime

    除这两种之外,还支持了特殊的两种移动cousor的方式:

    • Earliest(将游标移动到最前面)
    • Latest(移动到最后面,默认方式)

    Pulsar 的client端与broker端是使用 protobuf 协议进行交互的。seek 在 proto 中的定义如下:

    message CommandSeek {
        required uint64 consumer_id = 1;
        required uint64 request_id  = 2;
    
        optional MessageIdData message_id = 3;
        optional uint64 message_publish_time = 4;
    }
    

    message_idmessage_publish_time 就是进行 seek 操作的方式,consumer_idrequest_id 这两个是大家公有的,实现每一种 seek 的操作都会用到这两个字段,如果后续还有需要支持的seek方式,需要在 proto 协议中添加。consumer_idrequest_id 这两个字段的作用也相对好理解,consumer_id是指当前seek的操作发生在哪一个consumer上,request_id是指哪一次请求中发生的这个seek操作,有了这两个字段,pulsar可以快速定位当前seek操作发生的情况。

    其中 MessageIdData 的内容如下:

    message MessageIdData {
        required uint64 ledgerId = 1;
        required uint64 entryId  = 2;
        optional int32 partition = 3 [default = -1];
        optional int32 batch_index = 4 [default = -1];
    }
    

    MessageID总共有四个字段,这四个字段都是pulsar自己生成的,用户没办法set只能get。其中 ledgerIdentryId 这两个字段是booker需要使用的。后面两个字段标示的是当前message的行为,也就是其属于哪一个partition,是否开启了batch功能。这四个四段构成了一个messageID用来唯一标识一条message。

    下面从源码角度来看一下,当一次 seek 操作发生时,pulsar是如何来处理的。

    首先来看在 serverCnx中的 handlerSeek(CommandSeek seek) 的处理逻辑。需要说明一下,在 pulsar 中所有的请求都在 serverCnx 中封装了对应的 handler 函数来处理。

    protected void handleSeek(CommandSeek seek) {
            // 1. 首先check当前连接的状态是否处于已经连接的状态
            checkArgument(state == State.Connected);
            // 2. 获取request_id
            final long requestId = seek.getRequestId();
            // 3. 通过 consumer_id 获取对应的consumer对象
            CompletableFuture<Consumer> consumerFuture = consumers.get(seek.getConsumerId());
    
            // 4. 判断当前messageId和messagePublishId是否存在,如果都不存在,直接返回。
            if (!seek.hasMessageId() && !seek.hasMessagePublishTime()) {
                ctx.writeAndFlush(
                        Commands.newError(requestId, ServerError.MetadataError, "Message id and message publish time were not present"));
                return;
            }
    
            // 5. 判断consumer的状态
            boolean consumerCreated = consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally();
    
            // 6-1. 如果consumer状态正常且是按照messageID进行seek操作,执行下面的分支。
            if (consumerCreated && seek.hasMessageId()) {
                // 获取当前consumer
                Consumer consumer = consumerFuture.getNow(null);
                // 获取当前consumer的订阅信息
                Subscription subscription = consumer.getSubscription();
                // 获取当前consumer的messageid
                MessageIdData msgIdData = seek.getMessageId();
    
                // 根据messageID中的ledgerID和EntryID获取当前message在bk中的具体位置
                Position position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId());
    
    
                // 根据获取到的位置信息,将cousor重置到指定的位置
                subscription.resetCursor(position).thenRun(() -> {
                    log.info("[{}] [{}][{}] Reset subscription to message id {}", remoteAddress,
                            subscription.getTopic().getName(), subscription.getName(), position);
                    ctx.writeAndFlush(Commands.newSuccess(requestId));
                }).exceptionally(ex -> {
                    log.warn("[{}][{}] Failed to reset subscription: {}", remoteAddress, subscription, ex.getMessage(), ex);
                    ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
                            "Error when resetting subscription: " + ex.getCause().getMessage()));
                    return null;
                });
            // 6-2. 如果是按照publishTime执行seek,执行下面的分支
            } else if (consumerCreated && seek.hasMessagePublishTime()){
                Consumer consumer = consumerFuture.getNow(null);
                Subscription subscription = consumer.getSubscription();
                long timestamp = seek.getMessagePublishTime();
    
                subscription.resetCursor(timestamp).thenRun(() -> {
                    log.info("[{}] [{}][{}] Reset subscription to publish time {}", remoteAddress,
                            subscription.getTopic().getName(), subscription.getName(), timestamp);
                    ctx.writeAndFlush(Commands.newSuccess(requestId));
                }).exceptionally(ex -> {
                    log.warn("[{}][{}] Failed to reset subscription: {}", remoteAddress, subscription, ex.getMessage(), ex);
                    ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
                            "Reset subscription to publish time error: " + ex.getCause().getMessage()));
                    return null;
                });
            // 6-3. 其它的情况出错返回
            } else {
                ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, "Consumer not found"));
            }
        }
    

    resetCursor 实现:

    private void resetCursor(Position finalPosition, CompletableFuture<Void> future) {
            // 1. 确保当前操作是原子的操作,如果进入resetCousor时为false的话,将该标记为置为true,表明当前有人在操作cousor。
            if (!IS_FENCED_UPDATER.compareAndSet(PersistentSubscription.this, FALSE, TRUE)) {
                future.completeExceptionally(new SubscriptionBusyException("Failed to fence subscription"));
                return;
            }
    
            // 2. broker需要断开与所有consumer的连接,这个也好理解,如果我们没有将与consumer的连接信息断开的话,在操作cousor时,consumer还可能receive到消息。这个行为是不可控制的。当broker断开连接之后,client会去触发重连的逻辑。具体代码实现在grabcnx()中。
            final CompletableFuture<Void> disconnectFuture;
            if (dispatcher != null && dispatcher.isConsumerConnected()) {
                disconnectFuture = dispatcher.disconnectAllConsumers();
            } else {
                disconnectFuture = CompletableFuture.completedFuture(null);
            }
    
            disconnectFuture.whenComplete((aVoid, throwable) -> {
                if (throwable != null) {
                    log.error("[{}][{}] Failed to disconnect consumer from subscription", topicName, subName, throwable);
                    IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
                    future.completeExceptionally(
                            new SubscriptionBusyException("Failed to disconnect consumers from subscription"));
                    return;
                }
                log.info("[{}][{}] Successfully disconnected consumers from subscription, proceeding with cursor reset",
                        topicName, subName);
    
                try {
                    // 3. 调用asyncResetCursor重置cousor的位置,到这一步代码已经进入到bk的client层面,在pulsar中,cousor的信息是存储到bk中的,还记得messageID中的ledgerID和entryID吗?这两个字段就是告诉bk当前messgae的位置信息。
                    cursor.asyncResetCursor(finalPosition, new AsyncCallbacks.ResetCursorCallback() {
                        // 4. 无论成功与否,都需要将IS_FENCED_UPDATER的值置为false,表明当前这一次的reset操作已经结束
                        @Override
                        public void resetComplete(Object ctx) {
                            if (log.isDebugEnabled()) {
                                log.debug("[{}][{}] Successfully reset subscription to position {}", topicName, subName,
                                        finalPosition);
                            }
                            IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
                            future.complete(null);
                        }
    
                        @Override
                        public void resetFailed(ManagedLedgerException exception, Object ctx) {
                            log.error("[{}][{}] Failed to reset subscription to position {}", topicName, subName,
                                    finalPosition, exception);
                            IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
                            // todo - retry on InvalidCursorPositionException
                            // or should we just ask user to retry one more time?
                            if (exception instanceof InvalidCursorPositionException) {
                                future.completeExceptionally(new SubscriptionInvalidCursorPosition(exception.getMessage()));
                            } else if (exception instanceof ConcurrentFindCursorPositionException) {
                                future.completeExceptionally(new SubscriptionBusyException(exception.getMessage()));
                            } else {
                                future.completeExceptionally(new BrokerServiceException(exception));
                            }
                        }
                    });
                } catch (Exception e) {
                    log.error("[{}][{}] Error while resetting cursor", topicName, subName, e);
                    IS_FENCED_UPDATER.set(PersistentSubscription.this, FALSE);
                    future.completeExceptionally(new BrokerServiceException(e));
                }
            });
        }
    

    asyncResetCursor:

    public void asyncResetCursor(Position newPos, AsyncCallbacks.ResetCursorCallback callback) {
            // 1. 判断传入进来的 newPos 参数是否正确
            checkArgument(newPos instanceof PositionImpl);
            // 2. 将 Position 的newPos强转为PositionImpl的newPosition
            final PositionImpl newPosition = (PositionImpl) newPos;
    
            // order trim and reset operations on a ledger
            ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
                // 3. 判断position是否有效
                // 3-1: newPosition是否有效
                // 3-2: 是否为earliest
                // 3-3: 是否为latest
                if (ledger.isValidPosition(newPosition) || newPosition.equals(PositionImpl.earliest)
                        || newPosition.equals(PositionImpl.latest)) {
                    // 4. 如果有效,调用internalResetCursor
                    internalResetCursor(newPosition, callback);
                } else {
                    // caller (replay) should handle this error and retry cursor reset
                    callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException(newPosition.toString()),
                            newPosition);
                }
            }));
        }
    

    internalResetCursor:

    protected void internalResetCursor(PositionImpl position, AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
            // 1. 如果为earliest或者latest的position,直接将position重置为指定值
            if (position.equals(PositionImpl.earliest)) {
                position = ledger.getFirstPosition();
            } else if (position.equals(PositionImpl.latest)) {
                position = ledger.getLastPosition().getNext();
            }
    
            log.info("[{}] Initiate reset position to {} on cursor {}", ledger.getName(), position, name);
    
            // 2. 确保此次resetCursor的操作是原子的
            synchronized (pendingMarkDeleteOps) {
                if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, FALSE, TRUE)) {
                    log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}",
                            ledger.getName(), position, name);
                    resetCursorCallback.resetFailed(
                            new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"),
                            position);
                }
            }
    
            final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback;
    
            // 3. 复制一个position的副本为newPosition
            final PositionImpl newPosition = position;
    
            VoidCallback finalCallback = new VoidCallback() {
                @Override
                public void operationComplete() {
    
                    // modify mark delete and read position since we are able to persist new position for cursor
                    lock.writeLock().lock();
                    try {
                        // 4. 根据newPosition的位置获取前一个位置
                        PositionImpl newMarkDeletePosition = ledger.getPreviousPosition(newPosition);
    
                        // 5. 计数器
                        if (markDeletePosition.compareTo(newMarkDeletePosition) >= 0) {
                            messagesConsumedCounter -= getNumberOfEntries(
                                    Range.closedOpen(newMarkDeletePosition, markDeletePosition));
                        } else {
                            messagesConsumedCounter += getNumberOfEntries(
                                    Range.closedOpen(markDeletePosition, newMarkDeletePosition));
                        }
                        // 6. 将newMarkDeletePosition赋值给markDeletePosition,markDeletePosition表示当前需要删除的消息的位置。
                        markDeletePosition = newMarkDeletePosition;
                        
                        lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(),
                                null, null);
                        individualDeletedMessages.clear();
    
                        PositionImpl oldReadPosition = readPosition;
                        if (oldReadPosition.compareTo(newPosition) >= 0) {
                            log.info("[{}] reset position to {} before current read position {} on cursor {}",
                                    ledger.getName(), newPosition, oldReadPosition, name);
                        } else {
                            log.info("[{}] reset position to {} skipping from current read position {} on cursor {}",
                                    ledger.getName(), newPosition, oldReadPosition, name);
                        }
                        // 7. readPosition表示consumer接收数据时会从哪里开始读取数据,这个也是resetCursor的关键步骤,将readPosition到的位置重置为要被seek到的位置。
                        readPosition = newPosition;
                    } finally {
                        lock.writeLock().unlock();
                    }
        }
    

    综上所述,一个简单的resetCursor的操作,牵扯到了这么多的逻辑。下面我们用一张图来说明其调用关系。

    seek.png

    以上部分为当broker接收到一个seek请求时会执行哪些操作,下面我们来看用户调用seek接口之后,client时如何将请求传到broker上的。

    client

    seek 接口是在 Consumer 这个接口中暴漏给用户的,有如下两种使用形式:

    • void seek(MessageId messageId) throws PulsarClientException;
      • seek by messageID
    • void seek(long timestamp) throws PulsarClientException;
      • seek by publishTime

    client 的实现相对简单,就是构建好seek请求所需要的数据,将相应的数据发送给broker即可,具体如下:

    public CompletableFuture<Void> seekAsync(MessageId messageId) {
            // 检查当前 client 的连接状态是否正常
            if (getState() == State.Closing || getState() == State.Closed) {
                return FutureUtil
                        .failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
            }
    
            // 检查连接信息是否正常
            if (!isConnected()) {
                return FutureUtil.failedFuture(new PulsarClientException("Not connected to broker"));
            }
    
            final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
    
            // 初始化当前的 requestID
            long requestId = client.newRequestId();
            // 将用户传入进来的 messageID 强转为MessageIdImpl 类型
            MessageIdImpl msgId = (MessageIdImpl) messageId;
            // newSeek 根据上述提供的信息,将要发送的seek请求的数据准备好
            ByteBuf seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId());
            // 获取client连接信息
            ClientCnx cnx = cnx();
    
            log.info("[{}][{}] Seek subscription to message id {}", topic, subscription, messageId);
    
            // 将seek请求发送出去,并携带此次请求的requestID
            cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
                log.info("[{}][{}] Successfully reset subscription to message id {}", topic, subscription, messageId);
                // acknowledgmentsGroupingTracker主要是用来解决单一ack的问题,之前每次接收一条消息都会进行一次ack操作,现在将多个ack请求放到一个group中,定时来进行ack操作,节省资源,提高效率。
                // 在 consumerBuilder 中提供了acknowledgmentGroupTime() 接口,允许用户自己设定group ack中触发的时间,默认情况是:100ms
                // 需要注意的是,如果将该时间设置为 0 ,将会立即发送确认。
                acknowledgmentsGroupingTracker.flushAndClean();
                lastDequeuedMessage = messageId;
                // 清空每一个consumer的message queue
                incomingMessages.clear();
                seekFuture.complete(null);
            }).exceptionally(e -> {
                log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
                seekFuture.completeExceptionally(e.getCause());
                return null;
            });
            return seekFuture;
        }
    

    publishTime 与此同理,在此不做赘述。差不多以上就是 seek 操作的全部流程。

    补充说明:当 client 端发送 seek 指令到达 broker 时,broker 会关闭与当前连接的所有 consumer,总之一条原则,不能让原先的 consumer 接着去消费数据,因为 seek 会改变 consumer cursor 的位置信息。

    相关文章

      网友评论

          本文标题:Pulsar consumer seek() 源码梳理

          本文链接:https://www.haomeiwen.com/subject/hdufqctx.html