美文网首页Kafka
Kafka源码分析(六)消息发送可靠性——acks

Kafka源码分析(六)消息发送可靠性——acks

作者: 81e2cd2747f1 | 来源:发表于2020-02-28 17:42 被阅读0次

    接下去几篇关于可靠性的文章全部只讨论一个经典问题:
    Kafka怎么样才能不丢消息?

    怎么样的情况叫做丢消息?客户端调用future = send(msg, callback),但是中途报错了,这种情况不叫丢消息。真正丢消息的场景是,客户端调用了future = send(msg, callback)后,Broker已经明确告知客户端,这条消息已经发送成功了(future.isDone为true,或者callback的onSuccess被调用),但是消费者缺永久性消费不到这条数据。

    在生产者上,有一个参数叫做acks,
    如果acks=0,代表消息一旦被发送到Socket buffer中,就已经可以考虑消息发送成功,这个显然是不安全的,不做讨论;
    如果acks=1,代表消息只要在1ISR中被持久化成功后,Broker就可以告诉生产者,消息已经发送成功了。
    如果acks=all,代表消息需要在所有ISR都被持久化成功后,Broker才可以告诉生产者,消息已经发送成功了。

    假如Broker关于测试Topic的Replic设置为3,也就是说正常情况下ISR为3。
    首先将生产者的acks配置为1(acks=1)
    消息被发送到Broker后,是由该TopicPartition的Leader处理。Leader会调用appendToLocalLog将消息持久化在本地。

    val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
            isFromClient = isFromClient, entriesPerPartition, requiredAcks)
    

    持久化成功后,如果Leader立刻用reponse通知生产者,说,消息已经发送成功了,万一这时Leader挂了,那么消息就丢失了,消费者将没有办法消费到这条数据。

    将生产者的acks配置为all(acks=-1/all)
    Leader调用appendToLocalLog将消息持久化在本地后,不会立马给生产者返回,而是启动一个DelayedProduce(延时发送任务)。

    if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
        // create delayed produce operation
        val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
    
        // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
        val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq
    
        // try to complete the request immediately, otherwise put it into the purgatory
        // this is because while the delayed produce operation is being created, new
        // requests may arrive and hence make this operation completable.
        delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
    } else {
        // we can respond immediately
        val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
        responseCallback(produceResponseStatus)
    }
    
    // If all the following conditions are true, we need to put a delayed produce request and wait for replication to complete
    //
    // 1. required acks = -1
    // 2. there is data to append
    // 3. at least one partition append was successful (fewer errors than partitions)
    private def delayedProduceRequestRequired(requiredAcks: Short,
                                            entriesPerPartition: Map[TopicPartition, MemoryRecords],
                                            localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
        requiredAcks == -1 &&
        entriesPerPartition.nonEmpty &&
        localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
    }
    

    这时对于生产者而言,它还没有被通知消息已经发送成功了。即使这个时候这个Leader挂了,也不能算是消息丢失,只是生产者需要重新发送下就好。

    问题还没有结束,对于那个Leader而言,刚刚说到它只是创建了一个DelayedProduce,它什么时候才会给生产者回复呢。问题就到了这个DelayedProduce身上,延时是不可能无休止的,查看到DelayedProduce的tryComplete方法,只要满足了下面的这个条件,DelayedProduce这个延时任务就需要开始执行。

    // kafka.server.DelayedProduce#tryComplete
    override def tryComplete(): Boolean = {
        // check for each partition if it still has pending acks
        produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
            if (status.acksPending) {
            val (hasEnough, error) = replicaManager.getPartition(topicPartition) match {
                // code 
                // 返回false的情况很多,但是我们目前只关注返回true的情况,所以这里需要跟进去
                partition.checkEnoughReplicasReachOffset(status.requiredOffset)
                // code
            }
            // code return false
        }
    
        // ..
    }
    
    def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
        leaderReplicaIfLocal match {
            // code
    
            val minIsr = leaderReplica.log.get.config.minInSyncReplicas
            // 足够数量的ISR同步到了待发送的这条消息
            if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
                /*
                * The topic may be configured not to accept messages if there are not enough replicas in ISR
                * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
                */
                if (minIsr <= curInSyncReplicas.size)
                (true, Errors.NONE)
                else
                (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
            } else
                (false, Errors.NONE)
            case None =>
            (false, Errors.NOT_LEADER_FOR_PARTITION)
        }
    }
    

    简述这段逻辑就是,当足够数量的ISR同步到了待发送的这条消息,DelayedProduce会主动给生产者发送成功的响应,也就是下面这段逻辑。

    // kafka.server.DelayedProduce#onComplete
    override def onComplete() {
        val responseStatus = produceMetadata.produceStatus.mapValues(status => status.responseStatus)
        responseCallback(responseStatus)
    }
    

    生产者顺利收到Broker的响应后,消息就成功发送。这时,万一Leader挂了,就不怕了,剩下存活着的ISR中的某一个会被选为新的Leader(这个逻辑之后再聊),消费者照样还是能消费到这条消息的。

    问题解决了么?还没有,请看下篇分解。

    相关文章

      网友评论

        本文标题:Kafka源码分析(六)消息发送可靠性——acks

        本文链接:https://www.haomeiwen.com/subject/epvlhhtx.html