在上一篇文章中kafka 网络模型1 请求响应流程,我分析了Kafka的请求、响应流程,但留下了Selector的疑点。本文会分析Selector和它的poll()
是如何进行网络IO的,NetworkReceive是如何被完整读取的,Send是如何被完整写出的,还会涉及到KafkaChannel和它的mute机制。
Selector
读完上一篇文章,我们应当理解了Kafka Selector在内部维护了一个java nio Selector,变量名叫nioSelector
。

外部向Kafka Selector注册SocketChannel,其实都是注册到了java Selector上。
而Kafka Selector又是通过调用java Selector,来收集触发了I/O事件的Socket,从而对其执行I/O。

poll
Processor线程在主循环调用Selector::poll,看下它的实现。方法一开始就调用了clear。

结合注释可知,clear的实现是很简单地清空各种成员变量,因为这些变量都是上一次
poll()
的结果,在这次poll()
之前就应该被处理过。[1]
Selector::poll的方法很长,我们看到它的方法主体,借助其内部维护的java Selector收集了触发I/O事件的SelectionKey,并调用pollSelectionKeys执行I/O。


pollSelectionKeys
收集到了要I/O的SelectionKey后,pollSelectionKeys要怎么做呢?
读者可以先自主思考: 现在收集到了一组触发了I/O事件的SelectionKey,它们可能触发了读事件/写事件,或者两者都触发了。那么我们要对这些SelectionKey逐一判断触发的事件,如果触发了读事件,就尝试把字节流读进NetworkReceive;如果触发了写事件,就尝试把Send的数据写出去。
没错,pollSelectionKeys就是这么做的。对每个SelectionKey,它首先取出它上面附着的KafkaChannel,以便之后要进行IO操作时,对其进行IO。


然后,根据条件判断调用attempRead,并调用attempWrite。
- 关于attempRead. 实际上,在PLAINTEXT下,
channel.ready()
始终为真,channel.hasBytesBufferd()
始终为假[2]。所以如果要对一个KafkaChannel执行读取,它必须:-
触发了读事件(
key.isReadable()
为真) - 没有已经完整读取的NetworkReceive(
hasCompletedReceive(channel)
为假)。如果有,你应当先处理完这个NetworkReceive,才能再读取。 - 而且该KafkaChannel不能被静音(
explicityMutedChannels.contains(channel)
为假,这是mute机制的内容,我们之后再研究)
-
触发了读事件(
- 关于attempWrite. 实际上在该方法的实现内也有条件判断,有四个判断。而在
PLAINTEXT下,channel.ready()
始终为真;第四个判断有点复杂,我们跳过不分析,当它为真。所以要对一个KafkaChannel执行写入,它必须:- 触发了写事件(第三个条件)
-
有Send可供写出(第一个条件)
Selector::attemptWrite
KafkaChannel:::hasSend
在条件满足时,attemptRead会被调用于读取NetworkReceive;在attemptWrite条件满足后,write会被调用于写出Send。我们看下attemptRead和write的实现, 不难发现规律:
- attemptRead:
- 先调用
channel.read()
尝试读取若干字节 - 再用
channel.maybeCompeleteReceive()
判断是否NetworkReceive读取完成 - 如果完成,加入到
completedReceives
队列
- 先调用
- write
- 先调用
channel.write()
尝试写出若干字节 - 再用
channel.maybeCompleteSend()
判断Send是否写出完成 - 如果完成,加入到
completedSends
队列
Selector::attempRead
Selector::write
- 先调用

此处可见,KafkaChannel最重要的方法是有关IO的read
、maybeCompleteReceive
、write
、maybeCompleteSend
。我们之后再看它们的实现
KafkaChannel
KafkaChannel是基于java SocketChannel上的一层封装(尽管它是利用java nio attachment机制附着在SocketChannel上的对象)。每个KafkaChannel代表一个客户端,一个KafkaSelector会管理多个KafkaChannel,并对其进行IO操作。

KafkaChannel的附着
上文提到"尽管它是利用java nio attachment机制附着在SocketChannel上的对象",KafkaChannel是如何被附着的呢?
我们回到Processor的源码,在主循环中的其中一个方法configureNewConnections中,将SocketChannel注册到Selector上

Selector会一路调用至buildAndAttachKafkaChannel,在此创建KafkaChannel并附着到SocketChannel上



KafkaChannel的结构
KafkaChannel中最重要的三个成员变量是TransportLayer、NetworkReceive和Send。KafkaChannel通过TransportLayer进行读写,读取NetworkReceive,写出Send。
- 每个NetworkReceive代表一个单独的请求,KafkaChannel读取的字节流会收纳到NetworkReceive中,当NetworkReceive读满,一个请求就完整读取了
-
每个Send代表一个单独的响应,需要写出响应时只需赋值此变量,之后调用write()方法将其中的字节流写出
如下图所示,KafkaChannel通过TransportLayer进行IO,而TransportLayer只是SocketChannel的一层封装:
TransportLayer
我们稍占篇幅用来解释"TransportLayer只是SocketChannel的一层封装"这一点。
首先从它的注释可见,该接口就是SocketChannel的一个封装,可以直接当做SocketChannel的替代。

TransportLayer和SocketChannel都继承了GatheringByteChannel和ScatteringByteChannel,因此能对ByteBuffer和ByteBuffer数组进行读写。


这个类只有两个实现,分别对应PLAINTEXT和SSL模式,两个实现都维护了对SocketChannel的引用。篇幅关系,这里就不看SslTransportLayer了。

总之,对TransportLayer调用的各种IO方法,在底层都是转交给SocketChannel完成的,所以我们可以把它当做SocketChannel一样使用。
IO
在上一章我们说到,Selector对客户端的IO在于attempRead和write,但后者又会对KafkaChannel调用read、maybeCompleteReceive、write、maybeCompleteSend,这些都是KafkaChannel有关IO的重要方法。
我们先看KafkaChannel是如何执行读取的,可知,读取和判断是否完成,与NetworkReceive::readFrom和NetworkReceive::complete有关。



再看KafkaChannel是如何执行写出的
- 首先调用setSend。设置send、注册写事件,以让write被调用
- 然后写出操作要调用Send::writeTo
-
判断是否写出完成与Send::completed有关。写出完成时注销写事件。
可见KafkaChannel的读写、判断是否完成,与NetworkReceive::readFrom、NetworkReceive::complete、Send::writeTo、Send::completed有关。我们要分析这两个类的读写行为。
NetworkReceive和Send
NetworkReceive
结构如下,NetworkReceive包含两个ByteBuffer,叫做size和buffer。
- size的大小为4字节,存储了buffer的字节数
- buffer存储了具体的指令内容
在读取时,先读取4字节到size内,再根据size指示的大小为buffer分配内存,然后读满整个buffer时,NetworkReceive就读取完成了。由于缓存的大小清晰,能够避免"tcp粘包"问题

从构造函数中看出,size是固定4字节的

readFrom方法负责读取size和buffer,由于该方法可能被多次调用,每次都需要判断size和buffer的状态,并读取。


complete方法判断是否读取完成,也就是size和buffer是否都读满了

Send
Send是一个接口,含有completed和writeTo方法。有三个类/抽象类实现了writeTo方法。注释中强调了该方法可能会被调动多次才写出完成,因此其实现都遵循了这一点。

以ByteBufferSend为例,在构造函数中,计算了remaining,要写出的剩余字节数

writeTo方法负责写出一组ByteBuffer

completed方法会判断remaining是否不大于0(在PLAINTEXT下,pending始终为false)
mute机制
在读取一个请求后,mute
写出一个响应后,unmute
这样做是为了使得每个请求一来一回,有序排队

网友评论