Kafka事务分析

作者: WestC | 来源:发表于2019-11-05 19:23 被阅读0次

    Kafka 幂等性

    1. Kafka幂等性含义

      幂等性起初是在HTTP协议中定义,是指一次和多次请求同一个资源对资源本身应当具有同样的效果。从Kafka的角度来说,客户端多次发送同一条消息与发送一次消息,kafka服务只应当保存一次该消息。

    2. kafka幂等性

      1. 问题

        • 在生成消息时,Broker保存消息之后,在发回ack之前,broker宕机,producer将会重试发送该消息,可能引发消息重复
        • 前一条消息发送失败,后一条消息发送成功,当前一条消息重试成功后,将导致消息乱序
      2. 方案

        kafka引入了Producer ID (PID)和Sequence Number(SN)来实现kafka的幂等性。

        • producer端:

        每个Kafka producer初始化一个PID, 每发送消息时,同时发送Producer SN,单调自增

        • Broker端:

        Broker端维护了每个{PID,Topic,Partition}维护一个序列号Broker SN,每次check完之后写入消息时,该序列号变更为消息的SN

        当producer发送消息,Broker接收到消息后:

        如果Producer SN = Broker SN + 1 则Broker保留此消息,并将Broker SN + 1

        如果Producer SN > Broker SN + 1,则说明数据尚未写入,出现了乱序,则会被拒绝

        如果Producer SN < Broker SN, 说明消息重复发送,则消息会被丢弃

    Kafka 事务

    关于Kafka的事务网上有很多介绍,本文基于API调用顺序及API实现的角度对consume-trans-produce完整流程进行分析。

    1. KakfaProducer#initTransactions()

    查找Transaction Coordinator 并获取 Producer ID (pid)

    • Transaction Coordinator 的查找是隐藏的,在TransactionManager的initializeTransactions中会生成InitProducerIdHandler并放入pendingRequests中

    • Sender的run方法中调用maybeSendTransactionalRequest,在处理InitProducerHandler时,先取出该request,由于此时没有Transaction Coordinator,于是调用transactionManager.lookupCoordinator将FindCoordinatorHandler放入pendingRequests,并在此放入InitProducerIdHandler

      1. Transaction Coordinator的处理逻辑

      Coordinator的请求时调用 handleFindCoordinatorRequest,其实现与响应查找Group Coordinator请求一样,其实现也是一直到,主要逻辑是根据transactionId的哈希值和topic的partitionCount去余找出对应的partition Id,并返回topic信息

      Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount

      1. Transaction Coordinator 通过handleInitProducerIdRequest(request方法生成PID

      该方法首先会根据transactionId查找之前是否有相关事务,并根据之前的事务状态进行不同的处理逻辑,如果出事状态为空,会为该请求生成相应的PID,并将该信息写入_transaction_state,并返回InitProducerIdResult

      Request 信息:

      Request InitProducerIdRequestData(transactionId,transactionTImeout)
      Response case class InitProducerIdResult(producerId: Long, producerEpoch: Short, error: Errors)

      写入__transaction_state信息:

      topic __transaction_state
      Key transactionalId
      Value private[transaction] case class TxnTransitMetadata(producerId: Long, producerEpoch: Short,txnTimeoutMs: Int, txnState: TransactionState, topicPartitions: immutable.Set[TopicPartition], txnStartTimestamp: Long, txnLastUpdateTimestamp: Long)
      Time time.milliseconds()

      消息格式如下:

      baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 516 CreateTime: 1571627852548 size: 119 magic: 2 compresscodec: NONE crc: 4102323319 isvalid: true
      | offset: 4 CreateTime: 1571627852548 keysize: 14 valuesize: 37 sequence: -1 headerKeys: [] key:
      TesttranId payload: ��`����m�Q���������

      Producer epoch 在producer发送InitProducerIdResult请求,服务端查询对于同一个transactionId之前有存储相关状态时,则会返回客户端之前的ProducerId,同时将Producer的epoch增加1

    • 连续发送lookupCoordinator,InitProducerIdHandler

    1. KakfaProducer#beginTransaction()

    改方法主要进行客户端状态初始化,不涉及与集群交互

    1. KafkaConsumer.poll()

      该方法实现从kafka中获取数据,根据kafka的consumer group的逻辑,consumer会发送SYNC_GROUP请求,而GroupCoordinator接收到此消息后,会将消费组信息写入内部__consumer_offset的topic中

      服务端(GroupCoordinator)处理消息 : case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)

      改方法会写入消息到__consumer_offset中,类似:
       baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1571627856304 size: 311 magic: 2 compresscodec: NONE crc: 30386071 isvalid: true
      | offset: 0 CreateTime: 1571627856304 keysize: 14 valuesize: 227 sequence: -1 headerKeys: [] key:
      /consumer-1-99af3db9-1ffb-4fa5-a43a-2efd12f976d2��b9-1ffb-4fa5-a43a-2efd12f976d2m�Q�
      consumer-1
                /10.4.43.182����      tranTopic       tranTopic
      
    2. KafkaProducer.send()

      KafkaProducer的消息发送方法主要分为如下几个主要步骤:

      1. addPartitionsToTxnRequest,在发送消息前检查消息的TopicPartition是否已经在该事务中,如果没有,则需要向Transaction Coordinator 发送addPartitionsToTxnRequest

        服务端调用 handleAddPartitionToTxnRequest(request) 处理该请求,将该TopicPartition信息写入__transaction_state(主要是更新了TopicPartition信息,可能事务起始时间),

        写入磁盘信息:

        topic __transaction_state
        Key transactionalId
        Value private[transaction] case class TxnTransitMetadata(producerId: Long, producerEpoch: Short,txnTimeoutMs: Int, txnState: TransactionState, topicPartitions: immutable.Set[TopicPartition], txnStartTimestamp: Long, txnLastUpdateTimestamp: Long)
        Time time.milliseconds()

        写入消息如下:

        baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 635 CreateTime: 1571627857554 size: 139 magic: 2 compresscodec: NONE crc: 3124473386 isvalid: true
        | offset: 5 CreateTime: 1571627857554 keysize: 14 valuesize: 56 sequence: -1 headerKeys: [] key:
        TesttranId payload: ��` TopicTestm�Q��m�Q��

      2. 发送生产消息请求值对应的TopicPartition的leader

        leader通过handleProduceRequest(request)将消息写入对应的日志文件中,消息格式如下:

        baseOffset: 6 lastOffset: 7 count: 2 baseSequence: 0 lastSequence: 1 producerId: 5002 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 350 CreateTime: 1571627623516 size: 97 magic: 2 compresscodec: NONE crc: 1101266558 isvalid: true
        | offset: 6 CreateTime: 1571627623510 keysize: -1 valuesize: 9 sequence: 0 headerKeys: [] payload: FirstMess
        | offset: 7 CreateTime: 1571627623516 keysize: -1 valuesize: 13 sequence: 1 headerKeys: [] payload: SecondMessage

    3. KafkaProducer.sendOffsetsToTransaction()

      sendOffsetsToTransaction 主要包含两部分,分别是发送ADD_OFFSETS_TO_TXN请求至Transaction Coordinator和发送TXN_OFFSET_COMMIT至GroupCoordinator

      1. Transaction Coordinator 通过handleAddPartitionToTxnRequest(request) 将相关信息写入__transaction_state,消息格式如下:

        topic __transaction_state
        Key transactionalId
        Value private[transaction] case class TxnTransitMetadata(producerId: Long, producerEpoch: Short,txnTimeoutMs: Int, txnState: TransactionState, topicPartitions: immutable.Set[TopicPartition], txnStartTimestamp: Long, txnLastUpdateTimestamp: Long)
        Time time.milliseconds()

        baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 774 CreateTime: 1571627857608 size: 168 magic: 2 compresscodec: NONE crc: 3868568548 isvalid: true
        | offset: 6 CreateTime: 1571627857608 keysize: 14 valuesize: 84 sequence: -1 headerKeys: [] key:
        TesttranId payload: ��`__consumer_offsets TopicTestm�Q��m�Q��

        此时会将consumer的groupid对应的__comsumer_offset的partition信息写入__transaction_state

      2. Group Coordinator通过handleTxnOffsetCommitRequest方法将相关信息写入__consumer_offsets,消息格式较为复杂,可参考GroupMetadataManager#storeOffsets方法

        baseOffset: 1 lastOffset: 1 count: 1 baseSequence: 0 lastSequence: 0 producerId: 5002 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 311 CreateTime: 1571627858221 size: 137 magic: 2 compresscodec: NONE crc: 1296828467 isvalid: true
        | offset: 1 CreateTime: 1571627858221 keysize: 29 valuesize: 39 sequence: 0 headerKeys: [] key:
        MyConsumer tranTopic payload: ����sfsdfesdfsdssdfm�Q�'

    4. KafkaProducer.commitTransaction()/KafkaProducer.abortTransaction()

      如上两个方法均表示客户端终止事务的请求,两个方法均是直接向Transaction Coordinator发送END_TXN请求,服务端在处理该请求时,是通过handleEndTxnRequest(request)完成的。

      1. Transaction Coordinator在接收到客户端的结束事务的请求后,先在__transaction_state中写入PrepareComplete/PrepareAbort,写入消息格式如下:

        topic __transaction_state
        Key transactionalId

      | Value | private[transaction] case class TxnTransitMetadata(producerId: Long, producerEpoch: Short,txnTimeoutMs: Int, txnState: TransactionState, topicPartitions: immutable.Set[TopicPartition], txnStartTimestamp: Long, txnLastUpdateTimestamp: Long) |
      | Time | time.milliseconds() |

      请求 EndTxnRequest(version, transactionalId, producerId, producerEpoch, result)
       > baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 942 CreateTime: 1571627858574 size: 168 magic: 2 compresscodec: NONE crc: 3579425289 isvalid: true
       > | offset: 7 CreateTime: 1571627858574 keysize: 14 valuesize: 84 sequence: -1 headerKeys: [] key:
       > TesttranId payload: ��`__consumer_offsets       TopicTestm�Q��m�Q��
      
      1. 在写入消息完成后,TransactionCoordinator将向该事务的所有消息的TopicPartition的leader以及__consumer_offset中对应groupId的partition的leader发送WRITE_TXN_MARKERS请求,各broker接收到此请求后,通过handleWriteTxnMarkersRequest(request)将事务的状态写入log中,消息格式较为复杂,可参考handleWriteTxnMarkersRequest.handleWriteTxnMarkersRequest(request)

        TopicPartition :
        baseOffset: 8 lastOffset: 8 count: 1 baseSequence: -1 lastSequence: -1 producerId: 5002 producerEpoch: 1 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 447 CreateTime: 1571627858578 size: 78 magic: 2 compresscodec: NONE crc: 1728728300 isvalid: true
        | offset: 8 CreateTime: 1571627858578 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0

      2. 当所有的TopicPartition均将事务标记写入log文件之后,Transaction Coordinator会将事务结束的标志写入__transaction_state中,消息格式如下:

        topic __transaction_state
        Key transactionalId
        Value private[transaction] case class TxnTransitMetadata(producerId: Long, producerEpoch: Short,txnTimeoutMs: Int, txnState: TransactionState, topicPartitions: immutable.Set[TopicPartition], txnStartTimestamp: Long, txnLastUpdateTimestamp: Long)
        Time time.milliseconds()

        baseOffset: 8 lastOffset: 8 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1110 CreateTime: 1571627858584 size: 119 magic: 2 compresscodec: NONE crc: 2367176543 isvalid: true
        | offset: 8 CreateTime: 1571627858584 keysize: 14 valuesize: 37 sequence: -1 headerKeys: [] key:
        TesttranId payload: ��`m�Q��m�Q��

      PS1: 从以上可以看出,所有TransactionCoordinator写入__transaction_state的消息格式都是一样的。

      PS2 : Kafka内部的消息(__transaction_state, __consumer_offset)的查看,可以使用kafka自带的控制台消费命令,如果直接消费,将打印乱码,无法识别。 不过我们可以使用指定消息类型的方式来友好的展示其信息内容,如查看__transaction_state的消息类型,如使用transactionId为“TesttranId”的producer, 如下是查看该producer的事务交互信息:

      1. 查找TransactionId对应的__transaction_state的partition

        scala> Math.abs("TesttranId".hashCode)%50
        res0: Int = 48

      2. 使用如下命令查看该partition内容:

        kafka-console-consumer.sh --topic __transaction_state --partition 48 --bootstrap-server 10.1.236.66:9092 --formatter "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter"

      3. 打印消息格式如下(可以看出其信息与上文中所述的存储格式是一致的):

        TesttranId::TransactionMetadata(transactionalId=TesttranId, producerId=9075, producerEpoch=3, txnTimeoutMs=60000, state=Ongoing, pendingState=None, topicPartitions=Set(__consumer_offsets-22, TopicTest-0), txnStartTimestamp=1572316082745, txnLastUpdateTimestamp=1572316082795)

      对于__transaction_state, __consumer_offset两中类型的主题,可分别使用如下的formatter类打印其存储信息

      主题 formatter类
      __transaction_state "kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter"
      __consumer_offsets "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"

      PS3 : 所有交互的发送方,相应方对照:

      请求 发起方 响应方
      查找Transaction Coordinator 客户端producer 任一broker
      查找PID 客户端producer Transaction Coordinator
      SYNC_GROUP 客户端consumer Group Coordinator
      AddPartitionsToTxnRequest 客户端producer Transaction Coordinator
      Produce 客户端producer TopicPartition 的Leader
      ADD_OFFSETS_TO_TXN 客户端producer Transaction Coordinator
      TXN_OFFSET_COMMIT 客户端producer Group Coordinator
      END_TXN 客户端producer Transaction Coordinator
      WRITE_TXN_MARKERS Transaction Coordinator 所有partition的leader

    相关文章

      网友评论

        本文标题:Kafka事务分析

        本文链接:https://www.haomeiwen.com/subject/ghmebctx.html