完整的网络通信架构

Kafka网络通信组件最核心两个部分:SocketServer和KafkaRequestHanderPool
SocketServer 组件是核心,主要实现了 Reactor 模式,用于处理外部多个 Clients(这里 的 Clients 指的是广义的 Clients,可能包含 Producer、Consumer 或其他 Broker)的并 发请求,并负责将处理结果封装进 Response 中,返还给 Clients。
KafkaRequestHandlerPool 组件就是我们常说的 I/O 线程池,里面定义了若干个 I/O 线 程,用于执行真实的请求处理逻辑。
SocketServer
SocketServer 是 Kafka 网络通信层中最重要的子模块。它下辖的 Acceptor 线程、Processor 线程和 RequestChannel 等对象,都是实施网络通信的重要组成部分。

- AbstractServerThread 类:这是 Acceptor 线程和 Processor 线程的抽象基类,定义 了这两个线程的公有方法,如 shutdown(关闭线程)等。
- Acceptor 线程类:这是接收和创建外部 TCP 连接的线程。每个 SocketServer 实例只会 创建一个 Acceptor 线程。它的唯一目的就是创建连接,并将接收到的 Request 传递给下 游的 Processor 线程处理。
- Processor 线程类:这是处理单个 TCP 连接上所有请求的线程。每个 SocketServer 实 例默认创建若干个(num.network.threads)Processor 线程。Processor 线程负责将接 收到的 Request 添加到 RequestChannel 的 Request 队列上,同时还负责将 Response 返还给 Request 发送方。
- Processor 伴生对象类:仅仅定义了一些与 Processor 线程相关的常见监控指标和常量 等,如 Processor 线程空闲率等。
- ConnectionQuotas 类:是控制连接数配额的类。我们能够设置单个 IP 创建 Broker 连 接的最大数量,以及单个 Broker 能够允许的最大连接数。
- TooManyConnectionsException 类:SocketServer 定义的一个异常类,用于标识连 接数配额超限情况。
- SocketServer 类:实现了对以上所有组件的管理和操作,如创建和关闭 Acceptor、 Processor 线程等。
Acceptor 线程
经典的 Reactor 模式有个 Dispatcher 的角色,接收外部请求并分发给下面的实际处理线
程。在 Kafka 中,这个 Dispatcher 就是 Acceptor 线程。
val sendBufferSize: Int,
val recvBufferSize: Int,
brokerId: Int,
connectionQuotas: ConnectionQuotas,
metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
// 创建底层的NIO Selector对象
// Selector对象负责执行底层实际I/O操作,如监听连接创建请求、读写请求等
private val nioSelector = NSelector.open()
// Broker端创建对应的ServerSocketChannel实例
// 后续把该Channel向上一步的Selector对象注册
val serverChannel = openServerSocket(endPoint.host, endPoint.port)
// 创建Processor线程池,实际上是Processor线程数组
private val processors = new ArrayBuffer[Processor]()
private val processorsStarted = new AtomicBoolean
private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent",
"blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value))
}
Acceptor 代码中,提供了 3 个与 Processor 相关的方法,分别是 addProcessors、startProcessors 和 removeProcessors。

有了这三个方法,Acceptor 类就具备了基本的 Processor 线程池管理功能。不过,Acceptor 类逻辑的重头戏其实是 run 方法,它是处 理 Reactor 模式中分发逻辑的主要实现方法。
def run(): Unit = {
//注册OP_ACCEPT事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
// 等待Acceptor线程启动完成
startupComplete()
try {
// 当前使用的Processor序号,从0开始,最大值是num.network.threads - 1
var currentProcessorIndex = 0
while (isRunning) {
try {
// 每500毫秒获取一次就绪I/O事件
val ready = nioSelector.select(500)
if (ready > 0) {// 如果有I/O事件准备就绪
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable) {
// 调用accept方法创建Socket连接
accept(key).foreach { socketChannel =>
//
var retriesLeft = synchronized(processors.length)
var processor: Processor = null
do {
retriesLeft -= 1
// 指定由哪个Processor线程进行处理
processor = synchronized {
currentProcessorIndex = currentProcessorIndex % processors.length
processors(currentProcessorIndex)
}
// 更新Processor线程序号
currentProcessorIndex += 1
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
}
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
catch {
case e: ControlThrowable => throw e
case e: Throwable => error("Error occurred", e)
}
}
} finally { // 执行各种资源关闭逻辑
debug("Closing server socket and selector.")
CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
shutdownComplete()
}
}

基本上,Acceptor 线程使用 Java NIO 的 Selector + SocketChannel 的方式循环地轮询 准备就绪的 I/O 事件。这里的 I/O 事件,主要是指网络连接创建事件,即代码中的 SelectionKey.OP_ACCEPT。一旦接收到外部连接请求,Acceptor 就会指定一个 Processor 线程,并将该请求交由它,让它创建真正的网络连接。总的来说,Acceptor 线 程就做这么点事。
Processor 线程
如果说 Acceptor 是做入站连接处理的,那么,Processor 代码则是真正创建连接以及分 发请求的地方。显然,它要做的事情远比 Acceptor 要多得多。我先给出 Processor 线程 的 run 方法,你大致感受一下:
override def run(): Unit = {
startupComplete()// 等待Processor线程启动完成
try {
while (isRunning) {
try {
configureNewConnections()// 创建新连接
processNewResponses() 发送Response,并将Response放入到inflightResponse
poll()// 执行NIO poll,获取对应SocketChannel上准备就绪的I/O操作
processCompletedReceives()// 将接收到的Request放入Request队列
processCompletedSends()// 为临时Response队列中的Response执行回调逻辑
processDisconnected() // 处理因发送失败而导致的连接断开
closeExcessConnections() // 关闭超过配额限制部分的连接
} catch {
case e: Throwable => processException("Processor got uncaught exception.", e)
}
}
} finally { // 关闭底层资源
debug(s"Closing selector - processor $id")
CoreUtils.swallow(closeAll(), this, Level.ERROR)
shutdownComplete()
}
}


每个 Processor 线程在创建时都会创建 3 个队列。注意,这里的队列是广义的队列,其底 层使用的数据结构可能是阻塞队列,也可能是一个 Map 对象而已,如下所示:
private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
队列一:newConnections
它保存的是要创建的新连接信息,具体来说,就是 SocketChannel 对象。这是一个默认上 限是 20 的队列,而且,目前代码中硬编码了队列的长度,因此,你无法变更这个队列的长 度。
每当 Processor 线程接收新的连接请求时,都会将对应的 SocketChannel 放入这个队列。 后面在创建连接时(也就是调用 configureNewConnections 时),就从该队列中取出 SocketChannel,然后注册新的连接。
队列二:inflightResponses
严格来说,这是一个临时 Response 队列。当 Processor 线程将 Response 返还给 Request 发送方之后,还要将 Response 放入这个临时队列。
为什么需要这个临时队列呢?这是因为,有些 Response 回调逻辑要在 Response 被发送 回发送方之后,才能执行,因此需要暂存在一个临时队列里面。这就是 inflightResponses 存在的意义。
队列三:responseQueue
看名字我们就可以知道,这是 Response 队列,而不是 Request 队列。这告诉了我们一个 事实:每个 Processor 线程都会维护自己的 Response 队列,而不是像网上的某些文章说 的,Response 队列是线程共享的或是保存在 RequestChannel 中的。Response 队列里 面保存着需要被返还给发送方的所有 Response 对象。
Processor 从底层 Socket 通道不断读取已接收到的网络请求,然 后转换成 Request 实例,并将其放入到 Request 队列。
总结

网友评论