[TOC]
模块组成
Broker主要由SocketServer(Socket服务层),KafkaRequestHandlerPool(请求转发层),Kafka api(业务逻辑层),Control(集群状态控制层),KafkaHealthcheck Broker (Broker健康检测层),TopicConfigManager(topic配置信息监控层)组成。见下图:

-
SocketServer内部开启1个Acceptor线程接受对外的sock链接,然后转发给N个处理线程Processor,其中N=num.network.threads
-
N个Processor将接受到的request存放至阻塞队列requestQueue
-
M个处理线程 IO Thread从RequestChannel的请求阻塞队列requestQueue获取请求,调用kafkaApis处理不同的请求,M=num.io.threads
-
Broker共处理10种不同的request,分别为RequestKeys.ProduceKey、RequestKeys.FetchKey、RequestKeys.OffsetsKey、RequestKeys.MetadataKey 、RequestKeys.LeaderAndIsrKey、RequestKeys.StopReplicaKey、RequestKeys.UpdateMetadataKey、RequestKeys.ControlledShutdownKey、RequestKeys.OffsetCommitKey、RequestKeys.OffsetFetchKey。
-
KafkaApis(业务逻辑处理层)通过ReplicaManager(副本管理模块),logManager(日志模块),OffsetManager(偏移量管理模块)共同实现正常的业务逻辑
-
IO Thread将request处理过的response存放进RequestChannel的响应阻塞队列responseQueues[i]
-
Processor Thread从对应的RequestChannel的响应阻塞队列responseQueues[i]获取之前自己发送的request,然后发送给客户端
-
Control(集群状态控制层)通过ZK选举改变自身的状态,集群中只有1台broker成为leader,主要负责应对topic的创建和删除,topic的分区变化,topic的分区内部的复本变化,broker的上下线。
-
KafkaHealthcheck(Broker 健康状态监测层)通过在ZK上注册EphemeralPath来实现
-
TopicConfigManager(topic配置信息监控层)主要响应topic的配置信息的变化
Broker处理的request的来源

Broker共处理10种request,分别如下:
①ProducerRequest:生成者发送消息至KAFKA集群/或者消费者提交偏移量至KAFKA的log的请求
②TopicMetadataRequest: 生产者发送/消费者发送获取topic的元数据信息的请求
③FetchRequest:消费者发送/ReplicaFetcherThread发送获取message的请求
④OffsetRequest: 消费者发送获取某个topic的偏移量的请求
⑤OffsetCommitRequest:消费者发送提交偏移量至KAFKA(内部又根据配置提交至ZK或者log)的请求
⑥OffsetFetchRequest: 消费者发送获取自己提交到KAFKA上的偏移量(如果是ZK上,则消费者自己获取)的请求
⑦LeaderAndIsrRequest:当某个topic的partition的leader和isr发生改变时,Controller发送通知给相应的broker(比如说leader挂了)的请求
⑧StopReplicaRequest: 当broker停止时或者删除某个topic的分区的replica时,Controller发送通知相应的broker停止拷贝副本的请求
⑨UpdateMetadataRequest:当topic的元数据信息发生变化时,Controller发送通知给相应的Broker的请求
⑩BrokerControlledShutdownRequest:当集群内某个broker关机的时候,Broker(作为leader的controller)接收到的对应的broker准备关机的请求
SocketServer
class SocketServer(val brokerId: Int,
val host: String,
val port: Int,
val numProcessorThreads: Int,
val maxQueuedRequests: Int,
val sendBufferSize: Int,
val recvBufferSize: Int,
val maxRequestSize: Int = Int.MaxValue,
val maxConnectionsPerIp: Int = Int.MaxValue,
val connectionsMaxIdleMs: Long,
val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Socket Server on Broker " + brokerId + "], "
private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads)
@volatile private var acceptor: Acceptor = null
val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)//
/* a meter to track the average free capacity of the network processors */
private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
/**
* Start the socket server
*/
def startup() {
val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
for(i <- 0 until numProcessorThreads) {//启动num.network.threads个Processor线程处理网络请求
processors(i) = new Processor(i,
time,
maxRequestSize,
aggregateIdleMeter,
newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
numProcessorThreads,
requestChannel,
quotas,
connectionsMaxIdleMs)
Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
}
newGauge("ResponsesBeingSent", new Gauge[Int] {
def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
})
// register the processor threads for notification of responses 注册response的listener,当有response的时候,调用ResponseListener
requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
// start accepting connections 接受网络连接请求
this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
acceptor.awaitStartup
info("Started")
}
}
Acceptor作为一个独立的线程存在,当接受到网络连接请求的时候,轮训地甩给其中一个Processor线程处理之后的request
private[kafka] class Acceptor(val host: String,
val port: Int,
private val processors: Array[Processor],
val sendBufferSize: Int,
val recvBufferSize: Int,
connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) {
val serverChannel = openServerSocket(host, port)
/**
* Accept loop that checks for new connection attempts
*/
def run() {
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
startupComplete()
var currentProcessor = 0
while(isRunning) {
val ready = selector.select(500)
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectionKey = null
try {
key = iter.next
iter.remove()
if(key.isAcceptable)
accept(key, processors(currentProcessor))//添加到Processor的newConnections中,以后该processor负责这个Connections的所有request
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
// round robin to the next processor thread 轮训
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
debug("Closing server socket and selector.")
swallowError(serverChannel.close())
swallowError(selector.close())
shutdownComplete()
}
}
那么Processor线程是如何处理request的呢?关键在于requestChannel,它作为request和response的传输通道,使得Processor线程只负责接受connection的requet和发送相应的reponse,而和真实的业务逻辑无关,且看requestChannel:
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)//1个request的阻塞队列,供之后的KafkaRequestHandler线程接收
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)// num.network.threads个response的阻塞队列,供之后的KafkaRequestHandler线程存放
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
}
即Processor线程将各自对应的connection的request都存放进requestQueue中,然后分别从对应的responseQueues(i)中获取对应的request的response,如下图:

代码如下:
private[kafka] class Processor(val id: Int,
val time: Time,
val maxRequestSize: Int,
val aggregateIdleMeter: Meter,
val idleMeter: Meter,
val totalProcessorThreads: Int,
val requestChannel: RequestChannel,
connectionQuotas: ConnectionQuotas,
val connectionsMaxIdleMs: Long) extends AbstractServerThread(connectionQuotas) {
override def run() {
startupComplete()
while(isRunning) {
// setup any new connections that have been queued up
configureNewConnections()
// register any new responses for writing
processNewResponses()//receive 对应阻塞队列responseQueue的response
val startSelectTime = SystemTime.nanoseconds
val ready = selector.select(300)
currentTimeNanos = SystemTime.nanoseconds
val idleTime = currentTimeNanos - startSelectTime
idleMeter.mark(idleTime)
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
aggregateIdleMeter.mark(idleTime / totalProcessorThreads)
trace("Processor id " + id + " selection time = " + idleTime + " ns")
if(ready > 0) {
val keys = selector.selectedKeys()
val iter = keys.iterator()
while(iter.hasNext && isRunning) {
var key: SelectionKey = null
try {
key = iter.next
iter.remove()
if(key.isReadable)
read(key)//获取connection的request
else if(key.isWritable)
write(key)//写相应request的response
else if(!key.isValid)
close(key)
else
throw new IllegalStateException("Unrecognized key state for processor thread.")
} catch {
case e: EOFException => {
info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
close(key)
} case e: InvalidRequestException => {
info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
close(key)
} case e: Throwable => {
error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
close(key)
}
}
}
}
maybeCloseOldestConnection
}
debug("Closing selector.")
closeAll()
swallowError(selector.close())
shutdownComplete()
}
def read(key: SelectionKey) {
lruConnections.put(key, currentTimeNanos)
val socketChannel = channelFor(key)
var receive = key.attachment.asInstanceOf[Receive]
if(key.attachment == null) {
receive = new BoundedByteBufferReceive(maxRequestSize)
key.attach(receive)
}
val read = receive.readFrom(socketChannel)
val address = socketChannel.socket.getRemoteSocketAddress();
trace(read + " bytes read from " + address)
if(read < 0) {
close(key)
} else if(receive.complete) {
val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)//组装request
requestChannel.sendRequest(req)//把request发送给requestChannel
key.attach(null)
// explicitly reset interest ops to not READ, no need to wake up the selector just yet
key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
} else {
// more reading to be done
trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_READ)
wakeup()
}
}
def write(key: SelectionKey) {
val socketChannel = channelFor(key)
val response = key.attachment().asInstanceOf[RequestChannel.Response]
val responseSend = response.responseSend//获取response的内容
if(responseSend == null)
throw new IllegalStateException("Registered for write interest but no response attached to key.")
val written = responseSend.writeTo(socketChannel)//将response发送给负责该connection的socket
trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key)
if(responseSend.complete) {
response.request.updateRequestMetrics()
key.attach(null)
trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_READ)
} else {
trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress())
key.interestOps(SelectionKey.OP_WRITE)
wakeup()
}
}
}
KafkaRequestHandlerPool
KafkaRequestHandlerPool的逻辑比较简单,就是开启num.io.threads个KafkaRequestHandler,每个KafkaRequestHandler从RequestChannel. requestQueue 接受request,然后把对应的response存进responseQueues(i)队列
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
numThreads: Int) extends Logging with KafkaMetricsGroup {
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
val threads = new Array[Thread](numThreads)
val runnables = new Array[KafkaRequestHandler](numThreads)
for(i <- 0 until numThreads) {//创建num.io.threads个KafkaRequestHandler
runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
threads(i).start()
}
def shutdown() {
info("shutting down")
for(handler <- runnables)
handler.shutdown
for(thread <- threads)
thread.join
info("shut down completely")
}
}
class KafkaRequestHandler(id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: Int,
val requestChannel: RequestChannel,
apis: KafkaApis) extends Runnable with Logging {
this.logIdent = "[Kafka Request Handler " + id + " on Broker " + brokerId + "], "
def run() {
while(true) {
try {
var req : RequestChannel.Request = null
while (req == null) {
// We use a single meter for aggregate idle percentage for the thread pool.
// Since meter is calculated as total_recorded_value / time_window and
// time_window is independent of the number of threads, each recorded idle
// time should be discounted by # threads.
val startSelectTime = SystemTime.nanoseconds
req = requestChannel.receiveRequest(300)//从RequestChannel.requestQueue获取request
val idleTime = SystemTime.nanoseconds - startSelectTime
aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
}
if(req eq RequestChannel.AllDone) {
debug("Kafka request handler %d on broker %d received shut down command".format(
id, brokerId))
return
}
req.requestDequeueTimeMs = SystemTime.milliseconds
trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
apis.handle(req)//调用负责业务逻辑的KafkaApis进行真正的处理,然后把response存放进对应的RequestChannel. responseQueues[i]
} catch {
case e: Throwable => error("Exception when handling request", e)
}
}
}
def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
}
LogManager
kafka日志的组成
class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extends Logging {
private val logs = new Pool[TopicAndPartition, Log]()
}
class Log(val dir: File,
@volatile var config: LogConfig,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
……
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
}
Kafka创建topic时可以指定topic的partition个数,每个broker按照自己分到的topic的partition创建对应的log。其中每个log由多个LogSegment组成,每个LogSegment以本LogSegment的第一条message为索引供segments管理,如图:

其中LogSegment的组成如下:
class LogSegment(val log: FileMessageSet,
val index: OffsetIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
time: Time) extends Logging {
……
}
其中FileMessageSet通过设定设定segment之内的start和end来读取segment内的文件,OffsetIndex是segment里面的message索引,它并不是每条message建立索引,而是间隔log.index.interval.bytes条message添加一条索引,即如下图所示:

因此查找一条记录的话,如果给定topic,partition和offset,则分2步完成:
1)快速定位segmentfile
先定位位于哪个segmentfile,因为segments是由ConcurrentSkipListMap组成的一个跳跃表,即:

通过跳跃表快速定位到位于哪个segment file
2)segment file中查找msg chunk
然后通过稀疏索引的方式进行二分查找,查找到对应的索引块:
class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
/**
* Find the slot in which the largest offset less than or equal to the given
* target offset is stored.
*
* @param idx The index buffer
* @param targetOffset The offset to look for
*
* @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
*/
private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
// we only store the difference from the base offset so calculate that
val relOffset = targetOffset - baseOffset
// check if the index is empty
if(entries == 0)
return -1
// check if the target offset is smaller than the least offset
if(relativeOffset(idx, 0) > relOffset)
return -1
// binary search for the entry
var lo = 0
var hi = entries-1
while(lo < hi) {
val mid = ceil(hi/2.0 + lo/2.0).toInt
val found = relativeOffset(idx, mid)
if(found == relOffset)
return mid
else if(found < relOffset)
lo = mid
else
hi = mid - 1
}
lo
}
}
LogManager的启动
第一步:根据每个kafka配置的log目录,重建logs。
class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extends Logging {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
private val logCreationOrDeletionLock = new Object
private val logs = new Pool[TopicAndPartition, Log]()
createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
loadLogs()
……
}
recoveryPointCheckpoints记录了每个log最新的刷新的位置,即刷到磁盘的topic and partition的messages的偏移量,有可能在这之后的LogSegment和OffsetIndex刷新异常,需要特殊处理。
第二步:启动3个task和是否开启日志合并功能
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
//删除过期的数据+删除冗余的数据
scheduler.schedule("kafka-log-retention",
cleanupLogs,
delay = InitialTaskDelayMs,
period = retentionCheckMs,
TimeUnit.MILLISECONDS)
info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
//flush脏数据
scheduler.schedule("kafka-log-flusher",
flushDirtyLogs,
delay = InitialTaskDelayMs,
period = flushCheckMs,
TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
checkpointRecoveryPointOffsets,
delay = InitialTaskDelayMs,
period = flushCheckpointMs,
TimeUnit.MILLISECONDS)
}
//日志合并,把小的多个logsegment合并为大的一个logsegment
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
cleanupLogs:负责删除任何过期的数据和冗余的数据
flushDirtyLogs:负责刷新数据
checkpointRecoveryPointOffsets:对log进行checkpoint,可以提高broker重启的速度,只需要针对checkpoint之后的数据进行特殊处理
cleaner:根据log.cleaner.enable配置是否开启日志合并功能,日志合并指的是是否对相同的key进行压缩,如下图:

ReplicaManager
首先解释下2个名词:
AR(assignreplicas):分配副本 ISR(in-sync replicas):在同步中的副本,即下图:

Partition {
topic : string //topic名称
partition_id : int //partition id
leader : Replica // 这个分区的leader副本,是isr中的其中一个
ISR : Set[Replica] // 正在同步中的副本集合
AR : Set[Replica] // 这个分区的所有副本分配集合,一个broker上有至多一个分区副本
LeaderAndISRVersionInZK : long // version id of the LeaderAndISR path; used for conditionally update the LeaderAndISR path in ZK
}
Replica { // 一个分区副本信息
broker_id : int
partition : Partition //分区信息
log : Log //本地日志与副本关联信息
hw : long //最后被commit的message的offset信息
leo : long // 日志结尾offset
isLeader : Boolean //是否为该副本的leader
}
接下来来看ReplicaManager的主要作用,它的角色定位是负责接收controller的command以完成replica的管理工作,command主要有两种, LeaderAndISRCommand和StopReplicaCommand。因此主要完成三件事:
1)接受LeaderAndISRCommand命令 2)接受StopReplicaCommand命令 3)开启定时线程maybeShrinkIsr
LeaderAndISRCommand处理流程
当KafkaServer接受到LeaderAndIsrRequest指令时,会调用ReplicaManager的becomeLeaderOrFollower函数
def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,
offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = {
leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
.format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId,
leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topic, partition))
}
replicaStateChangeLock synchronized {
val responseMap = new collection.mutable.HashMap[(String, Int), Short]
if(leaderAndISRRequest.controllerEpoch < controllerEpoch) { // 检查requset epoch
leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
"its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
}
(responseMap, ErrorMapping.StaleControllerEpochCode)
} else {
val controllerId = leaderAndISRRequest.controllerId
val correlationId = leaderAndISRRequest.correlationId
controllerEpoch = leaderAndISRRequest.controllerEpoch
// First check partition's leader epoch
// 前面只是检查了request的epoch,但是还要检查其中的每个partitionStateInfo中的leader epoch
val partitionState = new HashMap[Partition, PartitionStateInfo]()
leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) =>
val partition = getOrCreatePartition(topic, partitionId)
val partitionLeaderEpoch = partition.getLeaderEpoch()
// If the leader epoch is valid record the epoch of the controller that made the leadership decision.
// This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
// local的partitionLeaderEpoch要小于request中的leaderEpoch,否则就是过时的request
if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) {
// 判断该partition是否被assigned给当前的broker
if(partitionStateInfo.allReplicas.contains(config.brokerId))
// 只将被分配到当前broker的partition放入partitionState,其中partition是当前的状况,partitionStateInfo是request中最新的状况
partitionState.put(partition, partitionStateInfo)
else {
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
"epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
.format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(",")))
}
} else {
// Otherwise record the error code in response
stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
"epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d")
.format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
topic, partition.partitionId, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, partitionLeaderEpoch))
responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
}
}
//核心逻辑,判断是否为leader或follower,分别调用makeLeaders和makeFollowers
//case (partition, partitionStateInfo)中,partition是replicaManager当前的情况,而partitionStateInfo中间放的是request的新的分配情况,
//筛选出partitionsTobeLeader
val partitionsTobeLeader = partitionState
.filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
//筛选出partitionsToBeFollower
val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
// 如果是leader,则调用leader的流程
if (!partitionsTobeLeader.isEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager)
// 如果是follower,则调用follower的流程
if (!partitionsToBeFollower.isEmpty)
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap, offsetManager)
// we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
// have been completely populated before starting the checkpointing there by avoiding weird race conditions
if (!hwThreadInitialized) {
// 启动HighWaterMarksCheckPointThread,hw很重要,需要定期存到磁盘,这样failover的时候可以往后load
startHighWaterMarksCheckPointThread()
hwThreadInitialized = true
}
//关闭idle的fether,如果成为leader,就不需要fetch
replicaFetcherManager.shutdownIdleFetcherThreads()
(responseMap, ErrorMapping.NoError)
}
}
}
主要是筛选出分配给该broker的partition的副本,并且根据lead是否为该brokerId区分为leader和follower,然后分别进入不同的流程
进入makeLeaders:
private def makeLeaders(controllerId: Int, epoch: Int,
partitionState: Map[Partition, PartitionStateInfo],
correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
offsetManager: OffsetManager) = {
partitionState.foreach(state =>
stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-leader transition for partition %s")
.format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))))
for (partition <- partitionState.keys)
responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
try {
// First stop fetchers for all the partitions
//暂停该fetch线程
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
"%d epoch %d with correlation id %d for partition %s")
.format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId)))
}
// Update the partition information to be the leader
//更新Partition中的属性
partitionState.foreach{ case (partition, partitionStateInfo) =>
partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)}
} catch {
case e: Throwable =>
partitionState.foreach { state =>
val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" +
" epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch,
TopicAndPartition(state._1.topic, state._1.partitionId))
stateChangeLogger.error(errorMsg, e)
}
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"for the become-leader transition for partition %s")
.format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
}
}
进入makeFollowers
private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo],
leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
offsetManager: OffsetManager) {
partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-follower transition for partition %s")
.format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
}
for (partition <- partitionState.keys)
responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
try {
var partitionsToMakeFollower: Set[Partition] = Set()
// TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
partitionState.foreach{ case (partition, partitionStateInfo) =>
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
leaders.find(_.id == newLeaderBrokerId) match {//只改变那些leader是available broker的partition
// Only change partition state when the leader is available
case Some(leaderBroker) =>
// 仅仅当partition的leader发生变化时才返回true,因为如果不变,不需要做任何操作
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager))
partitionsToMakeFollower += partition
else
stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
"controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader")
.format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
partition.topic, partition.partitionId, newLeaderBrokerId))
case None =>
// The leader broker should always be present in the leaderAndIsrRequest.
// If not, we should record the error message and abort the transition process for this partition
stateChangeLogger.error(("Broker %d received LeaderAndIsrRequest with correlation id %d from controller" +
" %d epoch %d for partition [%s,%d] but cannot become follower since the new leader %d is unavailable.")
.format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
partition.topic, partition.partitionId, newLeaderBrokerId))
// Create the local replica even if the leader is unavailable. This is required to ensure that we include
// the partition's high watermark in the checkpoint file (see KAFKA-1647)
partition.getOrCreateReplica()
}
}
//由于leader已发生变化,需要把旧的fetcher删除 ,因为它指向了旧的leader,从旧的leader fetch数据
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " +
"%d epoch %d with correlation id %d for partition %s")
.format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId)))
}
//由于leader已发生变化,所以之前和旧leader同步的数据可能和新的leader是不一致的,但hw以下的数据,大家都是一致的,所以就把hw以上的数据truncate掉,防止不一致
logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " +
"become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId,
partition.topic, partition.partitionId, correlationId, controllerId, epoch))
}
if (isShuttingDown.get()) { //真正shuttingDown,就不要再加fetcher
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " +
"controller %d epoch %d for partition [%s,%d] since it is shutting down").format(localBrokerId, correlationId,
controllerId, epoch, partition.topic, partition.partitionId))
}
}
else {
// we do not need to check if the leader exists again since this has been done at the beginning of this process
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
new TopicAndPartition(partition) -> BrokerAndInitialOffset(
leaders.find(_.id == partition.leaderReplicaIdOpt.get).get,
partition.getReplica().get.logEndOffset.messageOffset)).toMap
//增加新的fetcher,指向新的leader
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " +
"%d epoch %d with correlation id %d for partition [%s,%d]")
.format(localBrokerId, controllerId, epoch, correlationId, partition.topic, partition.partitionId))
}
}
} catch {
case e: Throwable =>
val errorMsg = ("Error on broker %d while processing LeaderAndIsr request with correlationId %d received from controller %d " +
"epoch %d").format(localBrokerId, correlationId, controllerId, epoch)
stateChangeLogger.error(errorMsg, e)
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"for the become-follower transition for partition %s")
.format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
}
}
StopReplicaCommand处理流程
当broker stop或用户删除某replica时,KafkaServer会接受到StopReplicaRequest指令,此时会调用ReplicaManager的stopReplicas函数:
def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicAndPartition, Short], Short) = {
replicaStateChangeLock synchronized {
val responseMap = new collection.mutable.HashMap[TopicAndPartition, Short]
if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d."
.format(localBrokerId, stopReplicaRequest.controllerEpoch) +
" Latest known controller epoch is %d " + controllerEpoch)
(responseMap, ErrorMapping.StaleControllerEpochCode)
} else {
controllerEpoch = stopReplicaRequest.controllerEpoch
// First stop fetchers for all partitions, then stop the corresponding replicas
// 先通过FetcherManager停止相关partition的Fetcher线程
replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition(r.topic, r.partition)))
for(topicAndPartition <- stopReplicaRequest.partitions){
// 然后针对不同的 topicAndPartition stop 副本
val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition, stopReplicaRequest.deletePartitions)
responseMap.put(topicAndPartition, errorCode)
}
(responseMap, ErrorMapping.NoError)
}
}
}
stopReplica在很多情况下是不需要真正删除replica的,比如宕机
def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
stateChangeLogger.trace("Broker %d handling stop replica (delete=%s) for partition [%s,%d]".format(localBrokerId,
deletePartition.toString, topic, partitionId))
val errorCode = ErrorMapping.NoError
getPartition(topic, partitionId) match {
case Some(partition) =>
if(deletePartition) { // 仅仅在deletePartition=true时,才会真正删除该partition
val removedPartition = allPartitions.remove((topic, partitionId))
if (removedPartition != null)
removedPartition.delete() // this will delete the local log
}
case None =>
// Delete log and corresponding folders in case replica manager doesn't hold them anymore.
// This could happen when topic is being deleted while broker is down and recovers.
if(deletePartition) {
val topicAndPartition = TopicAndPartition(topic, partitionId)
if(logManager.getLog(topicAndPartition).isDefined) {
logManager.deleteLog(topicAndPartition)
}
}
stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker"
.format(localBrokerId, deletePartition, topic, partitionId))
}
stateChangeLogger.trace("Broker %d finished handling stop replica (delete=%s) for partition [%s,%d]"
.format(localBrokerId, deletePartition, topic, partitionId))
errorCode
}
maybeShrinkIsr处理流程
在启动的时候会开启maybeShrinkIsr任务供调度器调度,其主要作用是周期性检查isr中的SyncTime和SyncMessages来判断某些副本是否已经不在同步状态了。
def startup() {
// start ISR expiration thread
scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)
}
private def maybeShrinkIsr(): Unit = {
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
}
def maybeShrinkIsr(replicaMaxLagTimeMs: Long, replicaMaxLagMessages: Long) {
inWriteLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
// getOutOfSyncReplicas获取不在同步状态的副本
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages)
if(outOfSyncReplicas.size > 0) {
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.size > 0)
info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in zk and in cache
updateIsr(newInSyncReplicas) //把isr上传到zk
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
replicaManager.isrShrinkRate.mark()
}
case None => // do nothing if no longer leader
}
}
}
def getOutOfSyncReplicas(leaderReplica: Replica, keepInSyncTimeMs: Long, keepInSyncMessages: Long): Set[Replica] = {
/**
* there are two cases that need to be handled here -
* 1. Stuck followers: If the leo of the replica hasn't been updated for keepInSyncTimeMs ms,
* the follower is stuck and should be removed from the ISR
* 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncMessages, the
* follower is not catching up and should be removed from the ISR
**/
val leaderLogEndOffset = leaderReplica.logEndOffset
val candidateReplicas = inSyncReplicas - leaderReplica
// Case 1 above
// fetch的时候会更新logEndOffsetUpdateTimeMs
val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs)
if(stuckReplicas.size > 0)
debug("Stuck replicas for partition [%s,%d] are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
// Case 2 above
// 判断落后的messages数目
val slowReplicas = candidateReplicas.filter(r =>
r.logEndOffset.messageOffset >= 0 &&
leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages)
if(slowReplicas.size > 0)
debug("Slow replicas for partition [%s,%d] are %s".format(topic, partitionId, slowReplicas.map(_.brokerId).mkString(",")))
stuckReplicas ++ slowReplicas
}
OffsetManager
OffsetManager主要提供对offset的保存和读取,kafka管理topic的偏移量有2种方式:1)zookeeper,即把偏移量提交至zk上;2)kafka,即把偏移量提交至kafka内部,主要由offsets.storage参数决定,默认为zookeeper。也就是说如果配置offsets.storage= kafka,则kafka会把这种offsetcommit请求转变为一种Producer,保存至topic为“__consumer_offsets”的log里面。
查看OffsetManager类:
class OffsetManager(val config: OffsetManagerConfig,
replicaManager: ReplicaManager,
zkClient: ZkClient,
scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
/* offsets and metadata cache */
//通过offsetsCache提供对GroupTopicPartition的查询
private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
//把过时的偏移量刷入磁盘,因为这些偏移量长时间没有被更新,意味着消费者可能不再消费了,也就不需要了,因此刷入到磁盘
scheduler.schedule(name = "offsets-cache-compactor",
fun = compact,
period = config.offsetsRetentionCheckIntervalMs,
unit = TimeUnit.MILLISECONDS)
……
}
主要完成2件事情:
1)提供对topic偏移量的查询
2)将偏移量消息刷入到以__consumer_offsets命名的topic的log中
offsetsCache的更新机制
那么offsetsCache是如何生成的呢?是通过producer端发送消息给leader,然后leader不断更新此偏移量。Leader更新此偏移量分3种情况:
1)当produceRequest.requiredAcks == 0时,即不需要ack,则立刻调用putOffsets更新偏移量
2)当produceRequest.requiredAcks == 1时,即需要立即返回response时,则立刻调用putOffsets更新偏移量
3)当produceRequest.requiredAcks == -1时,即只有此批消息达到最小副本数的时候,通过ProducerRequestPurgatory触发调用putOffsets更新偏移量 (ProducerRequestPurgatory之后的章节会讲)
compact机制
那么compact是如何工作的呢?
//去除offsetsCache过时的OffsetAndMetadata,并把偏移量刷入磁盘
private def compact() {
debug("Compacting offsets cache.")
val startMs = SystemTime.milliseconds
//过滤出长时间没有被更新的offset
val staleOffsets = offsetsCache.filter(startMs - _._2.timestamp > config.offsetsRetentionMs)
debug("Found %d stale offsets (older than %d ms).".format(staleOffsets.size, config.offsetsRetentionMs))
// delete the stale offsets from the table and generate tombstone messages to remove them from the log
val tombstonesForPartition = staleOffsets.map { case(groupTopicAndPartition, offsetAndMetadata) =>
val offsetsPartition = partitionFor(groupTopicAndPartition.group)
trace("Removing stale offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata))
offsetsCache.remove(groupTopicAndPartition)
val commitKey = OffsetManager.offsetCommitKey(groupTopicAndPartition.group,
groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
(offsetsPartition, new Message(bytes = null, key = commitKey))
}.groupBy{ case (partition, tombstone) => partition }
// Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
// if we crash or leaders move) since the new leaders will get rid of stale offsets during their own purge cycles.
val numRemoved = tombstonesForPartition.flatMap { case(offsetsPartition, tombstones) =>
val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
partitionOpt.map { partition =>
val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
val messages = tombstones.map(_._2).toSeq
trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
try {
//把偏移量刷入磁盘,供kafka重启的时候读取,即loadOffsetsFromLog
partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
tombstones.size
}
catch {
case t: Throwable =>
error("Failed to mark %d stale offsets for deletion in %s.".format(messages.size, appendPartition), t)
// ignore and continue
0
}
}
}.sum
debug("Removed %d stale offsets in %d milliseconds.".format(numRemoved, SystemTime.milliseconds - startMs))
}
其实就是把不再有消息发送的topic的偏移量刷入到磁盘,并且leader在重启的时候可以调用loadOffsetsFromLog从磁盘加载偏移量。
网友评论