美文网首页
深入理解Kafka设计:RequestPurgatory(2)

深入理解Kafka设计:RequestPurgatory(2)

作者: 小吴酱呵呵 | 来源:发表于2017-06-11 16:14 被阅读0次
    版权声明:本文为博主原创文章,未经博主允许不得转载。
    

    摘要

    上一节 深入理解Kafka设计:RequestPurgatory(1),从类结构设计方面介绍了RequestPurgatory相关的几个主要组件类的功能,这一节将从源码层面来分析RequestPurgatory是如何处理延迟请求的。

    RequestPurgatory工作原理

    Kafka的延迟请求分两种类型:一种是Produce延迟请求,需要等待请求涉及到的数据都被其他follower同步;另一种则是Fetch延迟请求,为了避免不断的轮询,Fetch请求会等待有足够的数据量才返回。下面以Produce延迟请求为例来分析RequestPurgatory的工作原理。

    处理DelayedRequest

    图1.处理延迟ProduceRequest的时序图

    KafkaRequestHandler处理请求的过程已经在深入理解Kafka设计:高性能服务器模型(1)中介绍过。这里直接从KafkaApis处理发送请求的方法入手。

    KafkaApis.handleProducerOrOffsetCommitRequest用于接收Producer发送过来的新消息,然后将其持久化,并根据请求参数requiredAcks的值决定是否立即返回响应或将请求延迟处理。

    def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
        .....
        val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)//持久化消息数据
        .....
        if(produceRequest.requiredAcks == 0) {//producer不需要响应
        .....
        } else if (produceRequest.requiredAcks == 1 ||
            produceRequest.numPartitions <= 0 ||
            numPartitionsInError == produceRequest.numPartitions) {//producer只需要leader成功接受到消息即可
        .....
        } else {//producer需要所有follower成功同步到消息
                //为这个延迟请求生成(topic, parition)列表,Wathcers会将列表中的元素用作key
                val producerRequestKeys = produceRequest.data.keys.toSeq
                val statuses = localProduceResults.map(r =>
            r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
                val delayedRequest =  new DelayedProduce(
                    producerRequestKeys,
                    request,
                    produceRequest.ackTimeoutMs.toLong,
                    produceRequest,
                    statuses,
                    offsetCommitRequestOpt)//构建延迟请求
            
                //延迟请求交给producerRequestPurgatory
                val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
                if (satisfiedByMe)
                    producerRequestPurgatory.respond(delayedRequest)
        }
        .....
     }
    

    当requiredAcks>1时,会将原请求封装成延迟请求DelayedProduce,并交予producerRequestPurgatory来处理。如果该请求此时已经满足条件(即其他follower都已经成功同步了数据),那么会立即返回响应给producer。

    ProducerRequestPurgatory.checkAndMaybeWatch是处理DelayedProduce的唯一入口,他会先检查请求是否满足条件,否则再延迟处理请求,并返回false。

    def checkAndMaybeWatch(delayedRequest: T): Boolean = {
        if (delayedRequest.keys.size <= 0)//如果请求不包含消息,那么已满足条件
          return isSatisfiedByMe(delayedRequest)
    
        var isSatisfied = delayedRequest synchronized checkSatisfied(delayedRequest)//首先检查请求是否已满足条件
        if (isSatisfied)
          return isSatisfiedByMe(delayedRequest)//更新isSatisied标志位,reaper线程会使用到
    
        //由于一个请求可能包含多个topic-partition的数据,每个topic-partition都需要做条件检查
        for(key <- delayedRequest.keys) {
          val lst = watchersFor(key)//为该topic-partition创建Watcher
          if (!lst.addIfNotSatisfied(delayedRequest)) {//如果request还没有满足条件,那么添加到该key对应的Watchers里面
            return false//如果已经满足条件,那么为了节约开销,不再检查余下的key
          }
        }
    
        isSatisfied = delayedRequest synchronized checkSatisfied(delayedRequest)//再一次检查请求是否满足条件
        if (isSatisfied)
          return isSatisfiedByMe(delayedRequest)
        else {
          //仍然不满足时,需要加入超时控制
          expiredRequestReaper.enqueue(delayedRequest)
          return false
        }
      }
    

    一般来说,大部分请求很快便会满足条件,所以接连两次调用checkSatisfied,以减轻内存溢出风险。如果两次检查仍然没有满足条件,那么该请求也要控制超时时间了。

    ExpiredRequestReaper的工作

    ExpiredRequestReaper线程随Broker启动而启动,其目的是不断地检查超时请求,触发RequestPurgatory处理超时响应,并帮助其清理超时请求对象以缓解内存压力。

    图2.ExpiredRequestReaper线程工作时序图

    ExpiredRequestReaper.run中的主循环可以看出该线程的主要逻辑:

    def run() {
          while(running.get) {
            try {
              val curr = pollExpired()//获取下一个超时请求
              if (curr != null) {
                curr synchronized {
                  expire(curr)//触发RequestPurgatory执行请求超时的动作,具体行为由子类来实现
                }
              }
              //RequestPurgatory监听的请求个数超过1000时
              if (RequestPurgatory.this.watched() >= purgeInterval) {
                val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum//清除已满足条件的请求
              }
              //当超时请求列表超过1000时
              if (delayed() >= purgeInterval) {
                val purged = purgeSatisfied()//清除已满足条件的请求
              }
            } catch {
                .....
            }
          }
          .....
        }
        
        //获取下一个超时请求
        private def pollExpired(): T = {
          while(true) {
            val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS)//获取第一个超时的对象
            if (curr == null)
              return null.asInstanceOf[T]
            val updated = curr.satisfied.compareAndSet(false, true)//更新statisfied标志位
            if(updated) {
              return curr
            }
          }
          throw new RuntimeException("This should not happen")
        }
    

    不论是ExpiredRequestReaper.purgeSatisfied还是Watchers.purgeSatisfied,都是遍历他们的Request列表(数据结构不一样,ExpiredRequestReaper是DelayedQueue实现,另一个则是LinkedList),然后从中remove掉已经满足条件的Request对象,使得他们可以被GC回收掉,达到最终目的。

    释放DelayedRequest

    图3.ExpiredRequestReaper线程工作时序图

    上面提到DelayedProduce的创建、清理和超时控制,那么他的条件在什么时候会被满足呢?肯定是在数据同步以后了。KafkaApis.handleFetchRequest不仅是处理消费者拉取消息的逻辑,同时也是follower同步leader数据的逻辑入口,所以从这个方法入手来分析。

    def handleFetchRequest(request: RequestChannel.Request) {
        val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
        val dataRead = replicaManager.readMessageSets(fetchRequest)//根据topic,partition,offset,size等参数来读取相应的消息数据
    
        //如果是从follower过来的请求
        if(fetchRequest.isFromFollower)
          recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset))//那么更新内存中replica的相应offset
          
        .....
        if(fetchRequest.maxWait <= 0 ||//请求不需要等待
           fetchRequest.numPartitions <= 0 ||//请求不获取任何数据
           bytesReadable >= fetchRequest.minBytes ||//已经获取到足够数据
           errorReadingData) {//读取消息时出错
          .....
          //立即返回响应
          val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data))
          requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
        } else {//这里是处理DelayedFetch的逻辑,不再介绍
          .....
        }
      }
      
      private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) {
        .....
          case (topicAndPartition, offset) =>
            //更新该replicaId在这个Leader中所有topic-partition的offset
    replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic,
              topicAndPartition.partition, replicaId, offset)
    //此时要检查是否可以满足某些produce请求的条件了
    replicaManager.unblockDelayedProduceRequests(topicAndPartition)
        }
      }
      
      def unblockDelayedProduceRequests(key: TopicAndPartition) {
        val satisfied = producerRequestPurgatory.update(key)//更新key所对应的DelayedProduce
    
        //为每一个满足了条件的请求返回响应
        satisfied.foreach(producerRequestPurgatory.respond(_))
      }
    

    unblockDelayedProduceRequests是一个重要的方法,顾名思义,解锁Produce延迟请求。每一次follower成功获取到数据以后,除了要更新对应的offset外,最后还要解锁相应的Produce延迟请求。所以他调用了RequestPurgatory.update去更新topic-parition所对应的DelayedRequest状态。

    def update(key: Any): Seq[T] = {
        val w = watchersForKey.get(key)
        if(w == null)
          Seq.empty
        else
          w.collectSatisfiedRequests()
      }
    

    Watchers.collectSatisfiedRequests会找到topic-parition所对应的Watchers对象,然后遍历其Request列表并触发条件检查,并返回已满足的请求。

    def collectSatisfiedRequests(): Seq[T] = {
          val response = new mutable.ArrayBuffer[T]
          synchronized {
            val iter = requests.iterator()
            while(iter.hasNext) {
              val curr = iter.next
              if(curr.satisfied.get) {
                iter.remove()//这里也会去除已满足条件的请求
              } else {
                val satisfied = curr synchronized checkSatisfied(curr)
                if(satisfied) {
                  iter.remove()
                  val updated = curr.satisfied.compareAndSet(false, true)
                  if(updated == true) {
                    response += curr
                  }
                }
              }
            }
          }
          response
        }
    

    由于ExpiredRequestReaper线程也会触发所有Watchers遍历Request列表的动作,所以对每个Watchers对象做了同步控制。另外在RequestPurgatory.checkAndMaybeWatch方法中,业务线程也会执行checkSatisfied操作,跟这里的遍历有极小的可能会操作同一个request对象,所以也加入了同步控制curr synchronized checkSatisfied(curr)

    而且对一个老的延迟请求来说,Watchers.collectSatisfiedRequests这里是他更新状态的唯一入口了。

    Wathcers.checkSatisfied最终会调用DelayedRequest.isSatisfied,实现由具体子类来决定,看看DelayedProduce.isSatisfied

      def isSatisfied(replicaManager: ReplicaManager) = {
        //检查请求中每一个topic-partition
        partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) =>
          // skip those partitions that have already been satisfied
          if (fetchPartitionStatus.acksPending) {//acksPending代表是否还有数据没有同步完
            val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
            val (hasEnough, errorCode) = partitionOpt match {
              case Some(partition) =>
                partition.checkEnoughReplicasReachOffset(
                  fetchPartitionStatus.requiredOffset,
                  produce.requiredAcks)//检查该topic-partition的所有备份都已经追上了该请求的offset
              case None =>
                (false, ErrorMapping.UnknownTopicOrPartitionCode)
            }
            if (errorCode != ErrorMapping.NoError) {
              fetchPartitionStatus.acksPending = false
              fetchPartitionStatus.responseStatus.error = errorCode
            } else if (hasEnough) {
              fetchPartitionStatus.acksPending = false
              fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError
            }
          }
        }
    
        //如果该请求的所有topic-partition都已满足条件
        val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
        satisfied
      }
    

    DelayedProduce经过isSatisfied方法检查,确认条件已经满足,那么isSatisfied标志位被置为true,等待ExpiredRequestReaper定时执行清理,从Watchers和Reaper的列表中移除,最终被GC回收掉。完成Produce延迟请求的生命周期。

    缺陷

    上一节也提到了Reaper线程正是为了缓解内存压力而设计出来的,通过设置工作频率(purgeInterval)可以使其及时的清理已满足条件的延迟请求。但是工作频率设置过高,那么reaper线程会频繁地遍历Wathcers和他的Request列表(同步控制),带来了性能损耗。因此,kafka在0.9.x引入了新的设计来解决这个问题。

    总结

    这一节通过跟踪DelayedProduce请求的处理流程,梳理了其生命周期,也从中明白了RequestPurgatory的工作原理与不足。下一节,将介绍新的设计是如何解决这个问题的。

    相关文章

      网友评论

          本文标题:深入理解Kafka设计:RequestPurgatory(2)

          本文链接:https://www.haomeiwen.com/subject/ndihqxtx.html