美文网首页kafka
kafka Broker源码解析

kafka Broker源码解析

作者: tracy_668 | 来源:发表于2021-01-26 00:02 被阅读0次

[TOC]

模块组成

Broker主要由SocketServer(Socket服务层),KafkaRequestHandlerPool(请求转发层),Kafka api(业务逻辑层),Control(集群状态控制层),KafkaHealthcheck Broker (Broker健康检测层),TopicConfigManager(topic配置信息监控层)组成。见下图:

image.png
  • SocketServer内部开启1个Acceptor线程接受对外的sock链接,然后转发给N个处理线程Processor,其中N=num.network.threads

  • N个Processor将接受到的request存放至阻塞队列requestQueue

  • M个处理线程 IO Thread从RequestChannel的请求阻塞队列requestQueue获取请求,调用kafkaApis处理不同的请求,M=num.io.threads

  • Broker共处理10种不同的request,分别为RequestKeys.ProduceKey、RequestKeys.FetchKey、RequestKeys.OffsetsKey、RequestKeys.MetadataKey 、RequestKeys.LeaderAndIsrKey、RequestKeys.StopReplicaKey、RequestKeys.UpdateMetadataKey、RequestKeys.ControlledShutdownKey、RequestKeys.OffsetCommitKey、RequestKeys.OffsetFetchKey。

  • KafkaApis(业务逻辑处理层)通过ReplicaManager(副本管理模块),logManager(日志模块),OffsetManager(偏移量管理模块)共同实现正常的业务逻辑

  • IO Thread将request处理过的response存放进RequestChannel的响应阻塞队列responseQueues[i]

  • Processor Thread从对应的RequestChannel的响应阻塞队列responseQueues[i]获取之前自己发送的request,然后发送给客户端

  • Control(集群状态控制层)通过ZK选举改变自身的状态,集群中只有1台broker成为leader,主要负责应对topic的创建和删除,topic的分区变化,topic的分区内部的复本变化,broker的上下线。

  • KafkaHealthcheck(Broker 健康状态监测层)通过在ZK上注册EphemeralPath来实现

  • TopicConfigManager(topic配置信息监控层)主要响应topic的配置信息的变化

Broker处理的request的来源

image.png

Broker共处理10种request,分别如下:

①ProducerRequest:生成者发送消息至KAFKA集群/或者消费者提交偏移量至KAFKA的log的请求

②TopicMetadataRequest: 生产者发送/消费者发送获取topic的元数据信息的请求

③FetchRequest:消费者发送/ReplicaFetcherThread发送获取message的请求
④OffsetRequest: 消费者发送获取某个topic的偏移量的请求

⑤OffsetCommitRequest:消费者发送提交偏移量至KAFKA(内部又根据配置提交至ZK或者log)的请求

⑥OffsetFetchRequest: 消费者发送获取自己提交到KAFKA上的偏移量(如果是ZK上,则消费者自己获取)的请求

⑦LeaderAndIsrRequest:当某个topic的partition的leader和isr发生改变时,Controller发送通知给相应的broker(比如说leader挂了)的请求

⑧StopReplicaRequest: 当broker停止时或者删除某个topic的分区的replica时,Controller发送通知相应的broker停止拷贝副本的请求

⑨UpdateMetadataRequest:当topic的元数据信息发生变化时,Controller发送通知给相应的Broker的请求

⑩BrokerControlledShutdownRequest:当集群内某个broker关机的时候,Broker(作为leader的controller)接收到的对应的broker准备关机的请求

SocketServer

class SocketServer(val brokerId: Int,
                   val host: String,
                   val port: Int,
                   val numProcessorThreads: Int,
                   val maxQueuedRequests: Int,
                   val sendBufferSize: Int,
                   val recvBufferSize: Int,
                   val maxRequestSize: Int = Int.MaxValue,
                   val maxConnectionsPerIp: Int = Int.MaxValue,
                   val connectionsMaxIdleMs: Long,
                   val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Socket Server on Broker " + brokerId + "], "
private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads)
@volatile private var acceptor: Acceptor = null
val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)//
/* a meter to track the average free capacity of the network processors */
private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
/**
 * Start the socket server
 */
def startup() {
  val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
  for(i <- 0 until numProcessorThreads) {//启动num.network.threads个Processor线程处理网络请求
    processors(i) = new Processor(i, 
                                  time, 
                                  maxRequestSize, 
                                  aggregateIdleMeter,
                                  newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
                                  numProcessorThreads, 
                                  requestChannel,
                                  quotas,
                                  connectionsMaxIdleMs)
    Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
  }

  newGauge("ResponsesBeingSent", new Gauge[Int] {
    def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
  })
  // register the processor threads for notification of responses 注册response的listener,当有response的时候,调用ResponseListener
  requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
  // start accepting connections 接受网络连接请求
  this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
  Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
  acceptor.awaitStartup
  info("Started")
}
}

Acceptor作为一个独立的线程存在,当接受到网络连接请求的时候,轮训地甩给其中一个Processor线程处理之后的request

private[kafka] class Acceptor(val host: String, 
                              val port: Int, 
                              private val processors: Array[Processor],
                              val sendBufferSize: Int, 
                              val recvBufferSize: Int,
                              connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) {
  val serverChannel = openServerSocket(host, port)

  /**
   * Accept loop that checks for new connection attempts
   */
  def run() {
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    startupComplete()
    var currentProcessor = 0
    while(isRunning) {
      val ready = selector.select(500)
      if(ready > 0) {
        val keys = selector.selectedKeys()
        val iter = keys.iterator()
        while(iter.hasNext && isRunning) {
          var key: SelectionKey = null
          try {
            key = iter.next
            iter.remove()
            if(key.isAcceptable)
               accept(key, processors(currentProcessor))//添加到Processor的newConnections中,以后该processor负责这个Connections的所有request
            else
               throw new IllegalStateException("Unrecognized key state for acceptor thread.")
            // round robin to the next processor thread 轮训
            currentProcessor = (currentProcessor + 1) % processors.length
          } catch {
            case e: Throwable => error("Error while accepting connection", e)
          }
        }
      }
    }
    debug("Closing server socket and selector.")
    swallowError(serverChannel.close())
    swallowError(selector.close())
    shutdownComplete()
  }
}

那么Processor线程是如何处理request的呢?关键在于requestChannel,它作为request和response的传输通道,使得Processor线程只负责接受connection的requet和发送相应的reponse,而和真实的业务逻辑无关,且看requestChannel:

class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
  private var responseListeners: List[(Int) => Unit] = Nil
  private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//1个request的阻塞队列,供之后的KafkaRequestHandler线程接收
  private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)// num.network.threads个response的阻塞队列,供之后的KafkaRequestHandler线程存放
  for(i <- 0 until numProcessors)
    responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
}

即Processor线程将各自对应的connection的request都存放进requestQueue中,然后分别从对应的responseQueues(i)中获取对应的request的response,如下图:


image.png

代码如下:

private[kafka] class Processor(val id: Int,
                               val time: Time,
                               val maxRequestSize: Int,
                               val aggregateIdleMeter: Meter,
                               val idleMeter: Meter,
                               val totalProcessorThreads: Int,
                               val requestChannel: RequestChannel,
                               connectionQuotas: ConnectionQuotas,
                               val connectionsMaxIdleMs: Long) extends AbstractServerThread(connectionQuotas) {
override def run() {
  startupComplete()
  while(isRunning) {
    // setup any new connections that have been queued up
    configureNewConnections()
    // register any new responses for writing
    processNewResponses()//receive 对应阻塞队列responseQueue的response
    val startSelectTime = SystemTime.nanoseconds
    val ready = selector.select(300)
    currentTimeNanos = SystemTime.nanoseconds
    val idleTime = currentTimeNanos - startSelectTime
    idleMeter.mark(idleTime)
    // We use a single meter for aggregate idle percentage for the thread pool.
    // Since meter is calculated as total_recorded_value / time_window and
    // time_window is independent of the number of threads, each recorded idle
    // time should be discounted by # threads.
    aggregateIdleMeter.mark(idleTime / totalProcessorThreads)

    trace("Processor id " + id + " selection time = " + idleTime + " ns")
    if(ready > 0) {
      val keys = selector.selectedKeys()
      val iter = keys.iterator()
      while(iter.hasNext && isRunning) {
        var key: SelectionKey = null
        try {
          key = iter.next
          iter.remove()
          if(key.isReadable)
            read(key)//获取connection的request
          else if(key.isWritable)
            write(key)//写相应request的response
          else if(!key.isValid)
            close(key)
          else
            throw new IllegalStateException("Unrecognized key state for processor thread.")
        } catch {
          case e: EOFException => {
            info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
            close(key)
          } case e: InvalidRequestException => {
            info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
            close(key)
          } case e: Throwable => {
            error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
            close(key)
          }
        }
      }
    }
    maybeCloseOldestConnection
  }
  debug("Closing selector.")
  closeAll()
  swallowError(selector.close())
  shutdownComplete()
}

def read(key: SelectionKey) {
  lruConnections.put(key, currentTimeNanos)
  val socketChannel = channelFor(key)
  var receive = key.attachment.asInstanceOf[Receive]
  if(key.attachment == null) {
    receive = new BoundedByteBufferReceive(maxRequestSize)
    key.attach(receive)
  }
  val read = receive.readFrom(socketChannel)
  val address = socketChannel.socket.getRemoteSocketAddress();
  trace(read + " bytes read from " + address)
  if(read < 0) {
    close(key)
  } else if(receive.complete) {
    val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)//组装request
    requestChannel.sendRequest(req)//把request发送给requestChannel
    key.attach(null)
    // explicitly reset interest ops to not READ, no need to wake up the selector just yet
    key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
  } else {
    // more reading to be done
    trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
    key.interestOps(SelectionKey.OP_READ)
    wakeup()
  }
}

def write(key: SelectionKey) {
  val socketChannel = channelFor(key)
  val response = key.attachment().asInstanceOf[RequestChannel.Response]
  val responseSend = response.responseSend//获取response的内容
  if(responseSend == null)
    throw new IllegalStateException("Registered for write interest but no response attached to key.")
  val written = responseSend.writeTo(socketChannel)//将response发送给负责该connection的socket
  trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key)
  if(responseSend.complete) {
    response.request.updateRequestMetrics()
    key.attach(null)
    trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress())
    key.interestOps(SelectionKey.OP_READ)
  } else {
    trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress())
    key.interestOps(SelectionKey.OP_WRITE)
    wakeup()
  }
}
}

KafkaRequestHandlerPool

KafkaRequestHandlerPool的逻辑比较简单,就是开启num.io.threads个KafkaRequestHandler,每个KafkaRequestHandler从RequestChannel. requestQueue 接受request,然后把对应的response存进responseQueues(i)队列

class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              numThreads: Int) extends Logging with KafkaMetricsGroup {

  /* a meter to track the average free capacity of the request handlers */
  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)

  this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
  val threads = new Array[Thread](numThreads)
  val runnables = new Array[KafkaRequestHandler](numThreads)
  for(i <- 0 until numThreads) {//创建num.io.threads个KafkaRequestHandler
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
    threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
    threads(i).start()
  }

  def shutdown() {
    info("shutting down")
    for(handler <- runnables)
      handler.shutdown
    for(thread <- threads)
      thread.join
    info("shut down completely")
  }
}
class KafkaRequestHandler(id: Int,
                          brokerId: Int,
                          val aggregateIdleMeter: Meter,
                          val totalHandlerThreads: Int,
                          val requestChannel: RequestChannel,
                          apis: KafkaApis) extends Runnable with Logging {
  this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "

  def run() {
    while(true) {
      try {
        var req : RequestChannel.Request = null
        while (req == null) {
          // We use a single meter for aggregate idle percentage for the thread pool.
          // Since meter is calculated as total_recorded_value / time_window and
          // time_window is independent of the number of threads, each recorded idle
          // time should be discounted by # threads.
          val startSelectTime = SystemTime.nanoseconds
          req = requestChannel.receiveRequest(300)//从RequestChannel.requestQueue获取request
          val idleTime = SystemTime.nanoseconds - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
        }
        if(req eq RequestChannel.AllDone) {
          debug("Kafka request handler %d on broker %d received shut down command".format(
            id, brokerId))
          return
        }
        req.requestDequeueTimeMs = SystemTime.milliseconds
        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
        apis.handle(req)//调用负责业务逻辑的KafkaApis进行真正的处理,然后把response存放进对应的RequestChannel. responseQueues[i]
      } catch {
        case e: Throwable => error("Exception when handling request", e)
      }
    }
  }
  def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
}

LogManager

kafka日志的组成

class LogManager(val logDirs: Array[File],
                 val topicConfigs: Map[String, LogConfig],
                 val defaultConfig: LogConfig,
                 val cleanerConfig: CleanerConfig,
                 ioThreads: Int,
                 val flushCheckMs: Long,
                 val flushCheckpointMs: Long,
                 val retentionCheckMs: Long,
                 scheduler: Scheduler,
                 val brokerState: BrokerState,
                 private val time: Time) extends Logging {
private val logs = new Pool[TopicAndPartition, Log]()
}
class Log(val dir: File,
          @volatile var config: LogConfig,
          @volatile var recoveryPoint: Long = 0L,
          scheduler: Scheduler,
          time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
……
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
}

Kafka创建topic时可以指定topic的partition个数,每个broker按照自己分到的topic的partition创建对应的log。其中每个log由多个LogSegment组成,每个LogSegment以本LogSegment的第一条message为索引供segments管理,如图:

image.png

其中LogSegment的组成如下:

class LogSegment(val log: FileMessageSet, 
                 val index: OffsetIndex, 
                 val baseOffset: Long, 
                 val indexIntervalBytes: Int,
                 val rollJitterMs: Long,
                 time: Time) extends Logging {
……
}

其中FileMessageSet通过设定设定segment之内的start和end来读取segment内的文件,OffsetIndex是segment里面的message索引,它并不是每条message建立索引,而是间隔log.index.interval.bytes条message添加一条索引,即如下图所示:

image.png

因此查找一条记录的话,如果给定topic,partition和offset,则分2步完成:

1)快速定位segmentfile

先定位位于哪个segmentfile,因为segments是由ConcurrentSkipListMap组成的一个跳跃表,即:

image.png

通过跳跃表快速定位到位于哪个segment file

2)segment file中查找msg chunk

然后通过稀疏索引的方式进行二分查找,查找到对应的索引块:

class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
/**
 * Find the slot in which the largest offset less than or equal to the given
 * target offset is stored.
 * 
 * @param idx The index buffer
 * @param targetOffset The offset to look for
 * 
 * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
 */
private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
  // we only store the difference from the base offset so calculate that
  val relOffset = targetOffset - baseOffset
  // check if the index is empty
  if(entries == 0)
    return -1
  // check if the target offset is smaller than the least offset
  if(relativeOffset(idx, 0) > relOffset)
    return -1
  // binary search for the entry
  var lo = 0
  var hi = entries-1
  while(lo < hi) {
    val mid = ceil(hi/2.0 + lo/2.0).toInt
    val found = relativeOffset(idx, mid)
    if(found == relOffset)
      return mid
    else if(found < relOffset)
      lo = mid
    else
      hi = mid - 1
  }
  lo
}
}

LogManager的启动

第一步:根据每个kafka配置的log目录,重建logs。

class LogManager(val logDirs: Array[File],
                 val topicConfigs: Map[String, LogConfig],
                 val defaultConfig: LogConfig,
                 val cleanerConfig: CleanerConfig,
                 ioThreads: Int,
                 val flushCheckMs: Long,
                 val flushCheckpointMs: Long,
                 val retentionCheckMs: Long,
                 scheduler: Scheduler,
                 val brokerState: BrokerState,
                 private val time: Time) extends Logging {
  val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
  val LockFile = ".lock"
  val InitialTaskDelayMs = 30*1000
  private val logCreationOrDeletionLock = new Object
  private val logs = new Pool[TopicAndPartition, Log]()

  createAndValidateLogDirs(logDirs)
  private val dirLocks = lockLogDirs(logDirs)
  private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
  loadLogs()
……
}

recoveryPointCheckpoints记录了每个log最新的刷新的位置,即刷到磁盘的topic and partition的messages的偏移量,有可能在这之后的LogSegment和OffsetIndex刷新异常,需要特殊处理。

第二步:启动3个task和是否开启日志合并功能

def startup() {
  /* Schedule the cleanup task to delete old logs */
  if(scheduler != null) {
    info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
    //删除过期的数据+删除冗余的数据
    scheduler.schedule("kafka-log-retention", 
                       cleanupLogs, 
                       delay = InitialTaskDelayMs, 
                       period = retentionCheckMs, 
                       TimeUnit.MILLISECONDS)
    info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
    //flush脏数据
    scheduler.schedule("kafka-log-flusher", 
                       flushDirtyLogs, 
                       delay = InitialTaskDelayMs, 
                       period = flushCheckMs, 
                       TimeUnit.MILLISECONDS)

    scheduler.schedule("kafka-recovery-point-checkpoint",
                       checkpointRecoveryPointOffsets,
                       delay = InitialTaskDelayMs,
                       period = flushCheckpointMs,
                       TimeUnit.MILLISECONDS)
  }
  //日志合并,把小的多个logsegment合并为大的一个logsegment
  if(cleanerConfig.enableCleaner)
    cleaner.startup()
}

cleanupLogs:负责删除任何过期的数据和冗余的数据

flushDirtyLogs:负责刷新数据

checkpointRecoveryPointOffsets:对log进行checkpoint,可以提高broker重启的速度,只需要针对checkpoint之后的数据进行特殊处理

cleaner:根据log.cleaner.enable配置是否开启日志合并功能,日志合并指的是是否对相同的key进行压缩,如下图:

image.png

ReplicaManager

首先解释下2个名词:
AR(assignreplicas):分配副本 ISR(in-sync replicas):在同步中的副本,即下图:

image.png
Partition {                              
  topic                   : string           //topic名称
  partition_id            : int          //partition id
  leader                  : Replica           // 这个分区的leader副本,是isr中的其中一个
  ISR                     : Set[Replica]      // 正在同步中的副本集合
  AR                      : Set[Replica]      // 这个分区的所有副本分配集合,一个broker上有至多一个分区副本
  LeaderAndISRVersionInZK : long    // version id of the LeaderAndISR path; used for conditionally update the LeaderAndISR path in ZK
}
Replica {                                // 一个分区副本信息
  broker_id               : int
  partition               : Partition    //分区信息
  log                     : Log          //本地日志与副本关联信息
  hw                      : long         //最后被commit的message的offset信息
  leo                     : long         // 日志结尾offset
  isLeader                : Boolean      //是否为该副本的leader
}

接下来来看ReplicaManager的主要作用,它的角色定位是负责接收controller的command以完成replica的管理工作,command主要有两种, LeaderAndISRCommand和StopReplicaCommand。因此主要完成三件事:

1)接受LeaderAndISRCommand命令 2)接受StopReplicaCommand命令 3)开启定时线程maybeShrinkIsr

LeaderAndISRCommand处理流程
当KafkaServer接受到LeaderAndIsrRequest指令时,会调用ReplicaManager的becomeLeaderOrFollower函数

def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,
                           offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = {
  leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
    stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
                              .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId,
                                      leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topic, partition))
  }
  replicaStateChangeLock synchronized {
    val responseMap = new collection.mutable.HashMap[(String, Int), Short]
    if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { // 检查requset epoch
      leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
      stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
        "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
        leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
      }
      (responseMap, ErrorMapping.StaleControllerEpochCode)
    } else {
      val controllerId = leaderAndISRRequest.controllerId
      val correlationId = leaderAndISRRequest.correlationId
      controllerEpoch = leaderAndISRRequest.controllerEpoch

      // First check partition's leader epoch
      // 前面只是检查了request的epoch,但是还要检查其中的每个partitionStateInfo中的leader epoch
      val partitionState = new HashMap[Partition, PartitionStateInfo]()
      leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) =>
        val partition = getOrCreatePartition(topic, partitionId)
        val partitionLeaderEpoch = partition.getLeaderEpoch()
        // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
        // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
        // local的partitionLeaderEpoch要小于request中的leaderEpoch,否则就是过时的request
        if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) {
          // 判断该partition是否被assigned给当前的broker
          if(partitionStateInfo.allReplicas.contains(config.brokerId))
          // 只将被分配到当前broker的partition放入partitionState,其中partition是当前的状况,partitionStateInfo是request中最新的状况
            partitionState.put(partition, partitionStateInfo)
          else {
            stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
              "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
              .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
              topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(",")))
          }
        } else {
          // Otherwise record the error code in response
          stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
            "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d")
            .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
            topic, partition.partitionId, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, partitionLeaderEpoch))
          responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
        }
      }
      //核心逻辑,判断是否为leader或follower,分别调用makeLeaders和makeFollowers
      //case (partition, partitionStateInfo)中,partition是replicaManager当前的情况,而partitionStateInfo中间放的是request的新的分配情况,
      //筛选出partitionsTobeLeader
      val partitionsTobeLeader = partitionState
        .filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
      //筛选出partitionsToBeFollower
val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)

      // 如果是leader,则调用leader的流程
      if (!partitionsTobeLeader.isEmpty)
        makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager)
       // 如果是follower,则调用follower的流程
if (!partitionsToBeFollower.isEmpty)
        makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap, offsetManager)

      // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
      // have been completely populated before starting the checkpointing there by avoiding weird race conditions
      if (!hwThreadInitialized) {
        // 启动HighWaterMarksCheckPointThread,hw很重要,需要定期存到磁盘,这样failover的时候可以往后load
        startHighWaterMarksCheckPointThread()
        hwThreadInitialized = true
      }
      //关闭idle的fether,如果成为leader,就不需要fetch
      replicaFetcherManager.shutdownIdleFetcherThreads()
      (responseMap, ErrorMapping.NoError)
    }
  }
}

主要是筛选出分配给该broker的partition的副本,并且根据lead是否为该brokerId区分为leader和follower,然后分别进入不同的流程

进入makeLeaders:

private def makeLeaders(controllerId: Int, epoch: Int,
                        partitionState: Map[Partition, PartitionStateInfo],
                        correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
                        offsetManager: OffsetManager) = {
  partitionState.foreach(state =>
    stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
      "starting the become-leader transition for partition %s")
      .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))))
  for (partition <- partitionState.keys)
    responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
  try {
    // First stop fetchers for all the partitions
    //暂停该fetch线程
    replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
    partitionState.foreach { state =>
      stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
        "%d epoch %d with correlation id %d for partition %s")
        .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId)))
    }
    // Update the partition information to be the leader
    //更新Partition中的属性
    partitionState.foreach{ case (partition, partitionStateInfo) =>
      partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)}
  } catch {
    case e: Throwable =>
      partitionState.foreach { state =>
        val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" +
          " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch,
                                              TopicAndPartition(state._1.topic, state._1.partitionId))
        stateChangeLogger.error(errorMsg, e)
      }
      // Re-throw the exception for it to be caught in KafkaApis
      throw e
  }
  partitionState.foreach { state =>
    stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
      "for the become-leader transition for partition %s")
      .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
  }
}

进入makeFollowers

private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo],
                          leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
                          offsetManager: OffsetManager) {
  partitionState.foreach { state =>
    stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
      "starting the become-follower transition for partition %s")
      .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
  }
  for (partition <- partitionState.keys)
    responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
  try {
    var partitionsToMakeFollower: Set[Partition] = Set()
    // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
    partitionState.foreach{ case (partition, partitionStateInfo) =>
      val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
      val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
      leaders.find(_.id == newLeaderBrokerId) match {//只改变那些leader是available broker的partition
        // Only change partition state when the leader is available
        case Some(leaderBroker) =>
     // 仅仅当partition的leader发生变化时才返回true,因为如果不变,不需要做任何操作
          if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager))
            partitionsToMakeFollower += partition
          else
            stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
              "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader")
              .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
              partition.topic, partition.partitionId, newLeaderBrokerId))
        case None =>
          // The leader broker should always be present in the leaderAndIsrRequest.
          // If not, we should record the error message and abort the transition process for this partition
          stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
            " %d epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.")
            .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
            partition.topic, partition.partitionId, newLeaderBrokerId))
          // Create the local replica even if the leader is unavailable. This is required to ensure that we include
          // the partition's high watermark in the checkpoint file (see KAFKA-1647)
          partition.getOrCreateReplica()
      }
    }
    //由于leader已发生变化,需要把旧的fetcher删除 ,因为它指向了旧的leader,从旧的leader fetch数据
    replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))
    partitionsToMakeFollower.foreach { partition =>
      stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " +
        "%d epoch %d with correlation id %d for partition %s")
        .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId)))
    }
    //由于leader已发生变化,所以之前和旧leader同步的数据可能和新的leader是不一致的,但hw以下的数据,大家都是一致的,所以就把hw以上的数据truncate掉,防止不一致 
    logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
    partitionsToMakeFollower.foreach { partition =>
      stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " +
        "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId,
        partition.topic, partition.partitionId, correlationId, controllerId, epoch))
    }
    if (isShuttingDown.get()) {  //真正shuttingDown,就不要再加fetcher
      partitionsToMakeFollower.foreach { partition =>
        stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " +
          "controller %d epoch %d for partition [%s,%d] since it is shutting down").format(localBrokerId, correlationId,
          controllerId, epoch, partition.topic, partition.partitionId))
      }
    }
    else {
      // we do not need to check if the leader exists again since this has been done at the beginning of this process
      val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
        new TopicAndPartition(partition) -> BrokerAndInitialOffset(
          leaders.find(_.id == partition.leaderReplicaIdOpt.get).get,
          partition.getReplica().get.logEndOffset.messageOffset)).toMap
        //增加新的fetcher,指向新的leader
      replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
      partitionsToMakeFollower.foreach { partition =>
        stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " +
          "%d epoch %d with correlation id %d for partition [%s,%d]")
          .format(localBrokerId, controllerId, epoch, correlationId, partition.topic, partition.partitionId))
      }
    }
  } catch {
    case e: Throwable =>
      val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " +
        "epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
      stateChangeLogger.error(errorMsg, e)
      // Re-throw the exception for it to be caught in KafkaApis
      throw e
  }

  partitionState.foreach { state =>
    stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
      "for the become-follower transition for partition %s")
      .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
  }
}

StopReplicaCommand处理流程
当broker stop或用户删除某replica时,KafkaServer会接受到StopReplicaRequest指令,此时会调用ReplicaManager的stopReplicas函数:

def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicAndPartition, Short], Short) = {
  replicaStateChangeLock synchronized {
    val responseMap = new collection.mutable.HashMap[TopicAndPartition, Short]
    if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
      stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d."
        .format(localBrokerId, stopReplicaRequest.controllerEpoch) +
        " Latest known controller epoch is %d " + controllerEpoch)
      (responseMap, ErrorMapping.StaleControllerEpochCode)
    } else {
      controllerEpoch = stopReplicaRequest.controllerEpoch
      // First stop fetchers for all partitions, then stop the corresponding replicas
      // 先通过FetcherManager停止相关partition的Fetcher线程 
      replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition(r.topic, r.partition)))
      for(topicAndPartition <- stopReplicaRequest.partitions){
       // 然后针对不同的 topicAndPartition stop 副本 
val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition, stopReplicaRequest.deletePartitions)
        responseMap.put(topicAndPartition, errorCode)
      }
      (responseMap, ErrorMapping.NoError)
    }
  }
}

stopReplica在很多情况下是不需要真正删除replica的,比如宕机

def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {
  stateChangeLogger.trace("Broker %d handling stop replica (delete=%s) for partition [%s,%d]".format(localBrokerId,
    deletePartition.toString, topic, partitionId))
  val errorCode = ErrorMapping.NoError
  getPartition(topic, partitionId) match {
    case Some(partition) =>
      if(deletePartition) {  // 仅仅在deletePartition=true时,才会真正删除该partition 
        val removedPartition = allPartitions.remove((topic, partitionId))
        if (removedPartition != null)
          removedPartition.delete() // this will delete the local log
      }
    case None =>
      // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
      // This could happen when topic is being deleted while broker is down and recovers.
      if(deletePartition) {
        val topicAndPartition = TopicAndPartition(topic, partitionId)

        if(logManager.getLog(topicAndPartition).isDefined) {
            logManager.deleteLog(topicAndPartition)
        }
      }
      stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker"
        .format(localBrokerId, deletePartition, topic, partitionId))
  }
  stateChangeLogger.trace("Broker %d finished handling stop replica (delete=%s) for partition [%s,%d]"
    .format(localBrokerId, deletePartition, topic, partitionId))
  errorCode
}

maybeShrinkIsr处理流程
在启动的时候会开启maybeShrinkIsr任务供调度器调度,其主要作用是周期性检查isr中的SyncTime和SyncMessages来判断某些副本是否已经不在同步状态了。

def startup() {
  // start ISR expiration thread
  scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)
}
private def maybeShrinkIsr(): Unit = {
  trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
  allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
}
def maybeShrinkIsr(replicaMaxLagTimeMs: Long,  replicaMaxLagMessages: Long) {
  inWriteLock(leaderIsrUpdateLock) {
    leaderReplicaIfLocal() match {
      case Some(leaderReplica) =>
         // getOutOfSyncReplicas获取不在同步状态的副本 
        val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages)
        if(outOfSyncReplicas.size > 0) {
          val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
          assert(newInSyncReplicas.size > 0)
          info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
            inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
          // update ISR in zk and in cache
          updateIsr(newInSyncReplicas) //把isr上传到zk 
          // we may need to increment high watermark since ISR could be down to 1
          maybeIncrementLeaderHW(leaderReplica)
          replicaManager.isrShrinkRate.mark()
        }
      case None => // do nothing if no longer leader
    }
  }
}
def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = {
  /**
   * there are two cases that need to be handled here -
   * 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs ms,
   *                     the follower is stuck and should be removed from the ISR
   * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the
   *                     follower is not catching up and should be removed from the ISR
   **/
  val leaderLogEndOffset = leaderReplica.logEndOffset
  val candidateReplicas = inSyncReplicas - leaderReplica
  // Case 1 above
  // fetch的时候会更新logEndOffsetUpdateTimeMs 
  val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs)
  if(stuckReplicas.size > 0)
    debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
  // Case 2 above 
  // 判断落后的messages数目 
  val slowReplicas = candidateReplicas.filter(r =>
    r.logEndOffset.messageOffset >= 0 &&
    leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages)
  if(slowReplicas.size > 0)
    debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
  stuckReplicas ++ slowReplicas
}

OffsetManager

OffsetManager主要提供对offset的保存和读取,kafka管理topic的偏移量有2种方式:1)zookeeper,即把偏移量提交至zk上;2)kafka,即把偏移量提交至kafka内部,主要由offsets.storage参数决定,默认为zookeeper。也就是说如果配置offsets.storage= kafka,则kafka会把这种offsetcommit请求转变为一种Producer,保存至topic为“__consumer_offsets”的log里面。

查看OffsetManager类:

class OffsetManager(val config: OffsetManagerConfig,
                    replicaManager: ReplicaManager,
                    zkClient: ZkClient,
                    scheduler: Scheduler) extends Logging with KafkaMetricsGroup {

  /* offsets and metadata cache */
//通过offsetsCache提供对GroupTopicPartition的查询
  private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
  //把过时的偏移量刷入磁盘,因为这些偏移量长时间没有被更新,意味着消费者可能不再消费了,也就不需要了,因此刷入到磁盘
  scheduler.schedule(name = "offsets-cache-compactor",
                     fun = compact,
                     period = config.offsetsRetentionCheckIntervalMs,
                     unit = TimeUnit.MILLISECONDS)
……
}

主要完成2件事情:
1)提供对topic偏移量的查询
2)将偏移量消息刷入到以__consumer_offsets命名的topic的log中

offsetsCache的更新机制
那么offsetsCache是如何生成的呢?是通过producer端发送消息给leader,然后leader不断更新此偏移量。Leader更新此偏移量分3种情况:

1)当produceRequest.requiredAcks == 0时,即不需要ack,则立刻调用putOffsets更新偏移量
2)当produceRequest.requiredAcks == 1时,即需要立即返回response时,则立刻调用putOffsets更新偏移量
3)当produceRequest.requiredAcks == -1时,即只有此批消息达到最小副本数的时候,通过ProducerRequestPurgatory触发调用putOffsets更新偏移量 (ProducerRequestPurgatory之后的章节会讲)

compact机制
那么compact是如何工作的呢?

//去除offsetsCache过时的OffsetAndMetadata,并把偏移量刷入磁盘
private def compact() {
  debug("Compacting offsets cache.")
  val startMs = SystemTime.milliseconds
//过滤出长时间没有被更新的offset
  val staleOffsets = offsetsCache.filter(startMs - _._2.timestamp > config.offsetsRetentionMs)
  debug("Found %d stale offsets (older than %d ms).".format(staleOffsets.size, config.offsetsRetentionMs))
  // delete the stale offsets from the table and generate tombstone messages to remove them from the log
  val tombstonesForPartition = staleOffsets.map { case(groupTopicAndPartition, offsetAndMetadata) =>
    val offsetsPartition = partitionFor(groupTopicAndPartition.group)
    trace("Removing stale offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata))
    offsetsCache.remove(groupTopicAndPartition)
    val commitKey = OffsetManager.offsetCommitKey(groupTopicAndPartition.group,
      groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
    (offsetsPartition, new Message(bytes = null, key = commitKey))
  }.groupBy{ case (partition, tombstone) => partition }
  // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
  // if we crash or leaders move) since the new leaders will get rid of stale offsets during their own purge cycles.
  val numRemoved = tombstonesForPartition.flatMap { case(offsetsPartition, tombstones) =>
    val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
    partitionOpt.map { partition =>
      val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
      val messages = tombstones.map(_._2).toSeq
      trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
      try {
//把偏移量刷入磁盘,供kafka重启的时候读取,即loadOffsetsFromLog
        partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
        tombstones.size
      }
      catch {
        case t: Throwable =>
          error("Failed to mark %d stale offsets for deletion in %s.".format(messages.size, appendPartition), t)
          // ignore and continue
          0
      }
    }
  }.sum
  debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs))
}

其实就是把不再有消息发送的topic的偏移量刷入到磁盘,并且leader在重启的时候可以调用loadOffsetsFromLog从磁盘加载偏移量。

相关文章

网友评论

    本文标题:kafka Broker源码解析

    本文链接:https://www.haomeiwen.com/subject/fmuwzktx.html