美文网首页kafkaKafka
Kafka源码分析(七)消息发送可靠性——min.insync.

Kafka源码分析(七)消息发送可靠性——min.insync.

作者: 81e2cd2747f1 | 来源:发表于2020-02-28 18:22 被阅读0次

    继续解答问题:
    Kafka怎么样才能不丢消息?

    再次说明下acks的语义
    如果acks=all,代表消息需要在所有ISR都被持久化成功后,Broker才可以告诉生产者,消息已经发送成功了。

    acks=all时,消息需要在所有ISR都被持久化成功后,这里为什么不设计成,消息需要在所有的Follower都被持久化成功后。这个当然是有原因的,之后的文章再详细讲解这个设计问题。

    这里就涉及到一个关键的问题,3个ISR不代表3个Node(1 Leader + 2 Followers)。如果在一些特殊情况下,部分在ISR中的Follower被被剔除ISR,极端情况下,最后的ISR中只有Leader一个,消息被Leader持久化成功后,就给触发了DelayedProduce的tryComplete,因为完全符合上面表达的语义:如果acks=all,代表消息需要在所有ISR都被持久化成功后,Broker才可以告诉生产者,消息已经发送成功了,只是这次所有的ISR就Leader自己。

    // ISR缩减任务
    def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
        val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
        leaderReplicaIfLocal match {
        case Some(leaderReplica) =>
            // 获取那些没有跟上的Follower
            val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
            if(outOfSyncReplicas.nonEmpty) {
            val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
            assert(newInSyncReplicas.nonEmpty)
            info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","),
                newInSyncReplicas.map(_.brokerId).mkString(",")))
            // update ISR in zk and in cache
            // 把那些没有跟上的Follower从ISR中剔除
            updateIsr(newInSyncReplicas)
            // code
        }
    }
    
    // Follower是否跟上Leader的标准
    def getOutOfSyncReplicas(maxLagMs: Long): Set[Int] = {
      // Leader统计每一个Follower的LastCaughtUpTime(上一次追赶上的时间),如果和当前时间比大于10s。
      val candidateReplicaIds = inSyncReplicaIds - localBrokerId
      val currentTimeMs = time.milliseconds()
      val leaderEndOffset = localLogOrException.logEndOffset
      candidateReplicaIds.filter(replicaId => isFollowerOutOfSync(replicaId, leaderEndOffset, currentTimeMs, maxLagMs))
    }
    

    这时如果Leader挂了,消息还是丢了。

    问题就出在ISR的数量是可以忽多忽少的,ISR的数量变成1的时候,情况就完全和ISR=1是相同的。

    Kafka当然是可以通过参数来限制ISR的数量的: min.insync.replicas = n,代表的语义是,如果生产者acks=all,而在发送消息时,Broker的ISR数量没有达到n,Broker不能处理这条消息,需要直接给生产者报错。

    当然这个语义的解释已经足够清晰得表达了下面这段代码的意思

    def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = {
        val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
            leaderReplicaIfLocal match {
            case Some(leaderReplica) =>
                val log = leaderReplica.log.get
                val minIsr = log.config.minInSyncReplicas
                val inSyncSize = inSyncReplicas.size
    
                // Avoid writing to leader if there are not enough insync replicas to make it safe
                if (inSyncSize < minIsr && requiredAcks == -1) {
                throw new NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]"
                    .format(topicPartition, inSyncSize, minIsr))
                }
    
                // code
        }
        // code
    }
    

    这样的Fast-Fail处理,在当ISR不足时,也能够避由于Leader宕机引起的消息丢失。

    问题解决了么?还没有,请看下篇分解。

    相关文章

      网友评论

        本文标题:Kafka源码分析(七)消息发送可靠性——min.insync.

        本文链接:https://www.haomeiwen.com/subject/qavlhhtx.html