参考
回顾
在kafka 启动1 入口函数中,我们阅读了KafkaServer的注释,这里直接总结一下:
- KafkaServer有两种请求层, data层或control层
- data层处理来自客户端和集群中其它broker的请求, control层处理来自controller的请求.
- 两种层都有Reactor线程(负责新连接)、Processor线程(各自维护一个Selector,并从Socket读取请求)、Handler线程(处理请求、生成响应返回给Processor线程做写操作)
- Reactor、Processor、Handler的比例在data层中是1:N:M, 在control层中是1:1:1
Processor的创建
Processor负责data-plane和control-plane的创建和启动,各自又包含了Acceptor、Processor和RequestChannel。详情阅读kafka请求全流程(一)—— 客户端请求的"SocketServer 定义"一章
请求流程
转载下kafka请求全流程(二)—— 请求的接收以及分发的图:
该图仅供参考,因为它有几个问题:1. 图中ResponseChannel可能笔误,应该为RequestChannel[1] 2. 并不是Processor将响应放到ResponseQueue,而是Processor检查响应队列并写出。 3. 图中没有讲述Selector的作用
另外,文中提到了PRODUCE请求, 此处可拓展阅读Kafka-处理请求(生产(PRODUCE)请求、获取(FETCH)请求)
上图流程描述了Kafka的网络模型,到KafkaApis就结束了。它之后的流程可以参考Kafka 源码解析之 Server 端如何处理 Produce 请求(十二)中的图:
过渡
上文提到了SocketServer、Acceptor、Processor、RequestChannel这几个网络通信组件,还给出了流程图。读过源码的同学应该很快能反应过来。如果没有,可以看下几章。
另外Processor有关网络IO的操作都是交给org.apache.kafka.common.network.Selector完成,而后者又会引出其它Kafka的IO组件,比如KafkaChannel、NetworkReceive、MemoryPool、TransportLayer等。
如果对每个组件硬读,可能难以消化。接下来请跟随我沿着某几个流程、切面进行分析,在这个过程中增加对每个组件的理解。
.1 处理新连接
我们先沿着"处理新连接"这个流程进行研究,看看一个新连接是如何被处理的。
新连接的处理涉及如下几个组件,Acceptor、Processor、org.apache.kafka.common.network.Selector、java.nio.channels.Selector
我们按图中步骤,结合源码逐个解说
① 接受新连接
Acceptor线程在主循环中,监听和接受新连接
接受新连接
交给Processor
Acceptor将SocketChannel交给Processor的队列变量newConnections
。
Processor::accept
② 注册SocketChannel
Processor线程在主循环中,调用configureNewConnections()
,把SocketChannel从队列取出并注册到Kafka Selector上。Kafka在Java nio的Selector类上封装了一层,也叫Selector[2]
注册到KafkaSelector上
③ 为SocketChannel注册事件
看下Kafka Selector的是如何注册SocketChannel的,看下registerChannel的实现
由下方两幅图可知,Kafka Selector在内部维护了一个java nio Selector。注册java SocketChannel,就是为其在java nio Selector上注册事件。
java nio注册
成员变量java nio Selector
.2 Kafka Selector执行IO
Processor利用Kafka Selector执行网络IO,因此我们要讲解下两者之间的交互。
Kafka Selector对java nio Selector进行了封装,将TCP的流式I/O转化为一个个对象[3]的I/O。TCP的I/O是面向流的,在读取时不能保证刚好完整读取了一个对象,但经过Kafka Selector的封装,外界可以将对象的I/O交给它,而不再需要关心一个对象完整I/O的逻辑。
Kafka Selector内部维护了一个java nio Selector,其核心函数是poll()
,每次执行都会进行网络I/O;它还维护了一些"List",每次执行poll
,这些变量都会有所更新。
Processor通过调用Selector的poll()
方法,再取出它更新的"List",从而完成与外界的通信。
Processor线程循环下有不少函数,我们聚焦网络I/O,只研究图中的这三个函数
①poll()
调用了Kafka Selector的poll方法,该方法会执行网络I/O
Selector执行poll()
后,很多"List"会更新,比如compeltedReceives和completedSends[4],分别代表在这轮poll()
中"完整接收到的请求"和"完整写出的响应"。
②processCompletedReceives()
该方法迭代了Selector的completedReceives变量,对每条完整收到的请求进行处理。
这个变量的作用在于,在每次调用poll()
后,会完整接收到一些的NetworkReceive。通过迭代该变量,可以处理每一条请求。
③processCompletedSends()
该方法迭代了Selector的completedSends变量,对每条完整写出的响应进行处理。
这个变量的作用在于,代码在将要写出的Send交给Selector后,其写出就交给Selector完成了,如果希望在Send写出完成后执行一些逻辑,就可以利用此变量。通过每次执行poll()
后迭代该变量,可以为每个完整写出的Send执行剩余的逻辑。
用图片可以形象地表示这个流程。
① Processor对Kafka Selector调用poll()
,执行网络I/O。
- Kafka Selector会读取每个触发了读事件的Socket,并将数据放到NetworkReceive中。如果有请求接收完成,就加入到completedReceives变量。
-
Selector将外界事先设置好的Send进行写出,如果写出完成,就加入到completedSend变量。
主流程
在这一章,我们只是笼统地介绍了Processor是如何与Selector交互的,但没有讲清楚Selector的poll()
是如何运作的,那些变量是如何被更新的。我们先在此打住,去关注读取到请求后的处理。
.3 请求的读取、处理与响应的写出
完整的请求被读取、处理后,生成响应并写出的过程如下:
请求的处理流程
我们跟随源码来印证这个过程
① 取出请求,交给队列
我们从processCompletedReceives继续。在调用poll()
后,从selector.completedReceives
中取出每个请求并处理。
在该方法中,会生成请求,并通过sendRequest把请求交给RequestChannel
交给队列
查看RequestChannel的实现可知,请求被放入了队列
sendRequest
② 取出请求,执行请求,生成响应
我们看KafkaRequestHandler的线程主循环,可知它从RequestChannel中取出请求,并交给KafkaApis执行。下方图二也显示了apis的类型是KafkaApis。
执行请求
从RequestChannel的实现可知,请求是从队列中取出的。
取出请求
从KafkaApis的实现可以看出,它根据请求的类型有不同的处理,此处我们不必研究具体的行为。
KafkaApis
不同的命令有不同的行为,是否发出响应/发出什么响应都是不同的。我们以PRODUCE命令为例,看看响应是如何生成的。
在该方法中定义了一个子方法sendResponseCallback,其内调用了sendResponse。sendResponse负责发回响应。在此响应被生成。
响应的生成
之后我们看sendResponse的实现,看看响应是如何被送回Processor的。
③ 将响应放入队列
我们看下sendResponse的实现,代码取出了对应的Processor并将响应入队
看下RequestChannel的实现
// RequestChannel.scala
/** Send a response back to the socket server to be sent over the network */
def sendResponse(response: RequestChannel.Response): Unit = {
if (isTraceEnabled) {
...
}
val processor = processors.get(response.processor)
// The processor may be null if it was shutdown. In this case, the connections
// are closed, so the response is dropped.
if (processor != null) {
processor.enqueueResponse(response)
}
}
从Processor的实现看出,响应被放入了队列
④ 取出响应,交给Selector写出
在Processor中,dequeueResponse方法会将响应出队
那么该方法在哪里调用呢?正是Processor主循环调用的其中一个方法,processNewResponses
迭代处理响应
所以Processor线程在主循环中会从responseQueue取出每个响应,并进行处理。
processNewResponses将响应取出后,调用sendResponse,交给Selector将响应发回客户端。
总结
本文讲述了请求从Selector被读取、执行,生成响应,并交由Selector写回客户端的过程。但本文略过了Selector的实现细节,下一篇文章会分析。
网友评论