美文网首页Apache Pulsar 指南
Apache Pulsar 源码走读(五)服务端消息流转逻辑概述

Apache Pulsar 源码走读(五)服务端消息流转逻辑概述

作者: WJL3333 | 来源:发表于2021-02-21 18:28 被阅读0次

    版权归本人所有,如果转载请联系本人

    这篇大致说一下消息从写入到读取在Apache Pulsar服务端是怎么串起来的。
    ( 这篇不会详细说明每个逻辑怎么走的,不过会给读者一个整体的俯瞰印象。)

    首先说一下表示业务逻辑的几个对象。(都在org.apache.pulsar.broker.service 这个包里面)

    Pulsar服务端的主要逻辑对象

    Topic: 这个对象在服务端就表示一个topic。这个类是最上层的,所有逻辑都被组织到这个对象里面。

    • 负责管理org.apache.pulsar.broker.service.Producer (这里是服务端跟踪状态的对象)
    • 负责管理Subscription
    • 负责数据写入

    Subscription 这个对象表示一份数据的消费进度。

    • 负责管理Consumer(这里是服务端跟踪状态的对象)
    • 负责维护一份数据的消费进度(单独ack,累积ack,哪些已经消费哪些还没有消费)
    • 读取消息,负责消息的分发(哪个消费分给哪个consumer)
    • 消息重发,延迟投递。

    上面这2个对象,在存储层对应的就是Topic -> ManagedLedgerSubscription -> ManagedCursor

    消息写入

    通过找到服务端记录的Producer对象,经过一些逻辑处理(消息去重,加密,状态检查等)
    确定Topic 最后写入到ManagedLedger里面。

    消息读取

    则是客户端通过CommandFlow 告知自己当前可以处理消息的状态,
    触发消息的分发流程。通过从Consumer 拿到 Subscription 触发消息的读取和Dispatch的过程。

    写入逻辑相对读取逻辑来说比较直观。写入到ManagedLedger 即可。主要说下Subscription

    Pulsar的消息派发流程(Dispatcher)

    我们先单独看一下一个分区的topic

    • 有多个producer
    • 有一个subscription按照Shared方式消费。
    • 这个subscription里面有多个consumer(比如说3个consumer)
    • 每个consumer可能会单独ack某一个消息。

    假设这个topic的数据是一个纸带,如果确认消费好了一个消息就涂黑一段。
    写入的话就是不断给纸带增加长度。
    我们把那些已经分发出去的消息但是还没有ack的消息认为是灰色的。

    服务端会记录每个consumer当前的一个队列容量。
    如果consumer的队列可以接受更多消息的时候会主动发送CommandFlow
    请求给服务端,来标识自己能接受更多的消息。

    C1 -> 10
    C2 -> 4
    C3 -> 8

    有新消息写入的时候,如果为了减少延迟,最好马上能通知每个Subscription 有新的消息到来了。
    这样的话,比如说新写入了20条消息,则会触发一个消息分发的动作来把这20条消息读取出来(很可能走的cache)之后按照客户端和消息状态分给这3个consumer。

    Dispatcher

    这个类是Subscription 对象里面的一个成员变量。

    当消息从ManagedLedger 里面成功读取之后,这个类需要按照consumer的状态和消息的一些属性
    把消息推送给consumer(客户端)。

    根据订阅方式的不同分发的逻辑也有区别

    • Exclusive: 这个模式比较容易,所有消息都会分给唯一的一个Active Consumer
    • Failover: 这个模式在Exclusive的基础上,增加了一个切换consumer的逻辑。不过还是分发消息给一个consumer
    • Shared: 这个模式是按照轮训的方式将这个topic的消息分发给多个consumer,同时consumer发生变化的时候也需要做一些逻辑来调整分发逻辑。
    • Key_Shared:这个模式也是分发给多个consumer,不过可以一定程度上保证一个key上面的消息是有顺序的消费的(不一定严格保证)

    这样一个Dispatcher 需要有以下功能

    • 跟踪consumer状态:consumer队列容量,consumer个数变化。(因为会影响消息派发的逻辑)
    • 消息分发:根据当前状态确定读取成功的消息要分配到哪些consumer上面,并把消息推送给consumer
    • 消息重新投递的逻辑(replay):有的消息被consumer标记需要重新投递了,这样的话需要重新读取并分发。

    消费进度跟踪

    1. 消息确认方式

    消息确认可以是累积确认或者是单独确认的。

    • 累积确认比较容易。
      如果一个位置被确认那么之前的消息都认为是成功消费了的(kafka是这样的方式)
      不过这样灵活性比较差,如果一个消费不成功的话,可能就卡在这个位置了,影响其他消息的处理。

    • 单独确认的话,某条消息可以直接ack,而不会影响其他消息ack的状态被覆盖。

    2. 单条消息ack状态的维护(需要持久化)

    单独确认的话,相当于纸带上有一些位置之前全是黑色的,这种是某个点位之前全都消费完的。
    我们称这个点位是deletePoint。

    有一些位置是黑色和灰色相间的,这种就是某些消息已经被标记ack了,
    有些已经被投递到客户端但是还没有ack。
    随着单独确认逐渐累积,这样黑色的部分会慢慢连接起来。
    这样的话这个deletePoint就可以推进了。

    我们再看一下batch的情况,上面说的消息都是针对单条消息的(一个Entry)
    如果这个Entry里面的消息是做了batch发送的,里面会包含多条信息。

    客户端可能会单独ack这个batch里面的某个消息。
    这样的话我们需要记录这个Entry的batch里面哪些消息是已经被ack的。
    对于这种消息,Dispatcher在投递消息的时候会带着一个位图来标记这个Entry里面哪些消息是已经处理的。
    这样consumer会按照这个bitmap来进行过滤。

    3. 消息重发状态跟踪(非持久化)

    灰色的位置我们称为当前的readPoint,标记的是当前读取结尾的位置。如果有新的数据写入的话,dispatcher会从这个位置尝试读取新写入的数据,并推进这个readPoint。

    如果有的消息已经被发送给consumer了,但是这个消息consumer又通知服务端说需要重新投递。
    则这个时候就需要标记readPoint之前的某个消息需要重新读取,这个重新读取的话不会更改readPoint。
    同时因为这个消息没有被ack。所以这个redeliver的消息不需要被持久化。
    (重新加载之后就可以根据ack的状态推断出来)

    4. 消费进度持久化状态

    那么如果这个topic被unload到其他的broker。对于一个Subscription需要加载哪些状态呢?

    主要状态就是哪些消息已经ack了。同时确认下一次读取的位置是什么。

    需要知道记录单独ack消息的状态和batch ack信息的状态。

    ManagedCursor 从 ledger(状态被写到bookkeeper里面)或者zk(写入一直失败的话会fallback到zk上)
    读取到这个状态的话,按照相应逻辑恢复即可。

    
    // MLDataFormats.proto
    
    message ManagedCursorInfo {
        // If the ledger id is -1, then the mark-delete position is
        // the one from the (ledgerId, entryId) snapshot below
        required int64 cursorsLedgerId = 1;
    
        // Last snapshot of the mark-delete position
        optional int64 markDeleteLedgerId = 2;
        optional int64 markDeleteEntryId  = 3;
    
    
        // 记录了单独某个消息被ack的状态
        repeated MessageRange individualDeletedMessages = 4;
    
        // Additional custom properties associated with
        // the current cursor position
        repeated LongProperty properties = 5;
    
        // 时间戳
        optional int64 lastActive = 6;
    
        // Store which index in the batch message has been deleted
       
        // batch 的 ack状态记录
        repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 7;
    }
    

    相关文章

      网友评论

        本文标题:Apache Pulsar 源码走读(五)服务端消息流转逻辑概述

        本文链接:https://www.haomeiwen.com/subject/eouqfltx.html