美文网首页
kafka 网络模型2 Selector

kafka 网络模型2 Selector

作者: 不存在的里皮 | 来源:发表于2020-06-06 05:40 被阅读0次

在上一篇文章中kafka 网络模型1 请求响应流程,我分析了Kafka的请求、响应流程,但留下了Selector的疑点。本文会分析Selector和它的poll()是如何进行网络IO的,NetworkReceive是如何被完整读取的,Send是如何被完整写出的,还会涉及到KafkaChannel和它的mute机制。

Selector

读完上一篇文章,我们应当理解了Kafka Selector在内部维护了一个java nio Selector,变量名叫nioSelector


外部向Kafka Selector注册SocketChannel,其实都是注册到了java Selector上。
而Kafka Selector又是通过调用java Selector,来收集触发了I/O事件的Socket,从而对其执行I/O。
图中橙色的圆圈代表触发了I/O事件的Socket

poll

Processor线程在主循环调用Selector::poll,看下它的实现。方法一开始就调用了clear。


结合注释可知,clear的实现是很简单地清空各种成员变量,因为这些变量都是上一次poll()的结果,在这次poll()之前就应该被处理过。[1]

Selector::poll的方法很长,我们看到它的方法主体,借助其内部维护的java Selector收集了触发I/O事件的SelectionKey,并调用pollSelectionKeys执行I/O。



pollSelectionKeys

收集到了要I/O的SelectionKey后,pollSelectionKeys要怎么做呢?

读者可以先自主思考: 现在收集到了一组触发了I/O事件的SelectionKey,它们可能触发了读事件/写事件,或者两者都触发了。那么我们要对这些SelectionKey逐一判断触发的事件,如果触发了读事件,就尝试把字节流读进NetworkReceive;如果触发了写事件,就尝试把Send的数据写出去。

没错,pollSelectionKeys就是这么做的。对每个SelectionKey,它首先取出它上面附着的KafkaChannel,以便之后要进行IO操作时,对其进行IO。



然后,根据条件判断调用attempRead,并调用attempWrite。

  • 关于attempRead. 实际上,在PLAINTEXT下,channel.ready()始终为真,channel.hasBytesBufferd()始终为假[2]。所以如果要对一个KafkaChannel执行读取,它必须:
    • 触发了读事件(key.isReadable()为真)
    • 没有已经完整读取的NetworkReceive(hasCompletedReceive(channel)为假)。如果有,你应当先处理完这个NetworkReceive,才能再读取。
    • 而且该KafkaChannel不能被静音(explicityMutedChannels.contains(channel)为假,这是mute机制的内容,我们之后再研究)
  • 关于attempWrite. 实际上在该方法的实现内也有条件判断,有四个判断。而在
    PLAINTEXT下,channel.ready()始终为真;第四个判断有点复杂,我们跳过不分析,当它为真。所以要对一个KafkaChannel执行写入,它必须:
    • 触发了写事件(第三个条件)
    • 有Send可供写出(第一个条件)




      Selector::attemptWrite
      KafkaChannel:::hasSend

在条件满足时,attemptRead会被调用于读取NetworkReceive;在attemptWrite条件满足后,write会被调用于写出Send。我们看下attemptRead和write的实现, 不难发现规律:

  • attemptRead:
    1. 先调用channel.read()尝试读取若干字节
    2. 再用channel.maybeCompeleteReceive()判断是否NetworkReceive读取完成
    3. 如果完成,加入到completedReceives队列
  • write
    1. 先调用channel.write()尝试写出若干字节
    2. 再用channel.maybeCompleteSend()判断Send是否写出完成
    3. 如果完成,加入到completedSends队列
      Selector::attempRead
      Selector::write
addToCompletedReceives

此处可见,KafkaChannel最重要的方法是有关IOreadmaybeCompleteReceivewritemaybeCompleteSend。我们之后再看它们的实现

KafkaChannel

KafkaChannel是基于java SocketChannel上的一层封装(尽管它是利用java nio attachment机制附着在SocketChannel上的对象)。每个KafkaChannel代表一个客户端,一个KafkaSelector会管理多个KafkaChannel,并对其进行IO操作。


KafkaChannel的附着
上文提到"尽管它是利用java nio attachment机制附着在SocketChannel上的对象",KafkaChannel是如何被附着的呢?
我们回到Processor的源码,在主循环中的其中一个方法configureNewConnections中,将SocketChannel注册到Selector上


Selector会一路调用至buildAndAttachKafkaChannel,在此创建KafkaChannel并附着到SocketChannel上


KafkaChannel的结构
KafkaChannel中最重要的三个成员变量是TransportLayer、NetworkReceive和Send。KafkaChannel通过TransportLayer进行读写,读取NetworkReceive,写出Send。

  • 每个NetworkReceive代表一个单独的请求,KafkaChannel读取的字节流会收纳到NetworkReceive中,当NetworkReceive读满,一个请求就完整读取了
  • 每个Send代表一个单独的响应,需要写出响应时只需赋值此变量,之后调用write()方法将其中的字节流写出



    如下图所示,KafkaChannel通过TransportLayer进行IO,而TransportLayer只是SocketChannel的一层封装:


TransportLayer

我们稍占篇幅用来解释"TransportLayer只是SocketChannel的一层封装"这一点。
首先从它的注释可见,该接口就是SocketChannel的一个封装,可以直接当做SocketChannel的替代。


TransportLayer和SocketChannel都继承了GatheringByteChannel和ScatteringByteChannel,因此能对ByteBuffer和ByteBuffer数组进行读写

SocketChannel

这个类只有两个实现,分别对应PLAINTEXT和SSL模式,两个实现都维护了对SocketChannel的引用。篇幅关系,这里就不看SslTransportLayer了。


总之,对TransportLayer调用的各种IO方法,在底层都是转交给SocketChannel完成的,所以我们可以把它当做SocketChannel一样使用。

IO

在上一章我们说到,Selector对客户端的IO在于attempRead和write,但后者又会对KafkaChannel调用read、maybeCompleteReceive、write、maybeCompleteSend,这些都是KafkaChannel有关IO的重要方法。

我们先看KafkaChannel是如何执行读取的,可知,读取和判断是否完成,与NetworkReceive::readFrom和NetworkReceive::complete有关。





再看KafkaChannel是如何执行写出的

  • 首先调用setSend。设置send、注册写事件,以让write被调用
  • 然后写出操作要调用Send::writeTo
  • 判断是否写出完成与Send::completed有关。写出完成时注销写事件。





    可见KafkaChannel的读写、判断是否完成,与NetworkReceive::readFrom、NetworkReceive::complete、Send::writeTo、Send::completed有关。我们要分析这两个类的读写行为。

NetworkReceive和Send

NetworkReceive
结构如下,NetworkReceive包含两个ByteBuffer,叫做size和buffer。

  • size的大小为4字节,存储了buffer的字节数
  • buffer存储了具体的指令内容

在读取时,先读取4字节到size内,再根据size指示的大小为buffer分配内存,然后读满整个buffer时,NetworkReceive就读取完成了。由于缓存的大小清晰,能够避免"tcp粘包"问题


从构造函数中看出,size是固定4字节的



readFrom方法负责读取size和buffer,由于该方法可能被多次调用,每次都需要判断size和buffer的状态,并读取。




complete方法判断是否读取完成,也就是size和buffer是否都读满了

Send
Send是一个接口,含有completed和writeTo方法。有三个类/抽象类实现了writeTo方法。注释中强调了该方法可能会被调动多次才写出完成,因此其实现都遵循了这一点。

以ByteBufferSend为例,在构造函数中,计算了remaining,要写出的剩余字节数



writeTo方法负责写出一组ByteBuffer



completed方法会判断remaining是否不大于0(在PLAINTEXT下,pending始终为false)

mute机制

在读取一个请求后,mute
写出一个响应后,unmute
这样做是为了使得每个请求一来一回,有序排队


  1. 比如completedReceives是上一次poll()中收到的请求,在这次poll()调用前就应当被处理过,所以这一次调用就应该清空。换句话说,如果这次不清空,那之后Processor就会重复处理这些请求了。

  2. 读者可自行点击查看这两个方法在PLAINTEXT下的实现,此处就不占用篇幅了。

相关文章

  • kafka 网络模型2 Selector

    在上一篇文章中kafka 网络模型1 请求响应流程,我分析了Kafka的请求、响应流程,但留下了Selector的...

  • Kafka源码分析-Content Table

    Kafka源码分析-网络层-1 Kafka源码分析-网络层-2 Kafka源码分析-网络层-3 Kafka源码分析...

  • kafka之网络模型总结

    弄清楚kafka的网络模型原理,能很好的帮助理解和优化kafka服务。kafka底层的网络通信,没有使用第三方rp...

  • kafka基础

    kafka的分区消费模型 分区消费模型是kafka的消费者编程模型。其模型如下所示: 主要是一个consumer对...

  • 二、Kafka基础实战:消费者和生产者实例

    一、Kafka消费者编程模型 1.分区消费模型 分区消费伪代码描述 2.组(Group)消费模型 按组(Group...

  • 使用CocoaAsyncSocket实现socket编程

    目录 一、网络七层模型及五层模型 1、网络七层模型 2、网络五层模型 二、各种协议 1、IP协议 2、TCP协议与...

  • 聊聊 Kafka: Producer 的网络模型

    一、Producer 的网络模型 我们前面几篇有说 Producer 发送流程的源码分析,但那个是大的轮廓,涉及到...

  • spring kafka 集成

    基础配置 1.mvn 坐标 2.kafka 配置文件 1.简单生产模型 生产消息 2.回调消息生产模型 3.事物消...

  • note

    Java IO,NIO,NIO2 以及与操作系统,磁盘 IO NIO模型selector NIO的核心是IO线程池...

  • Docker(单机Kafka安装)

    1. pull镜像 2. 创建通信网络。zookeeper和kafka之间的通信 3. 创建容器 KAFKA_AD...

网友评论

      本文标题:kafka 网络模型2 Selector

      本文链接:https://www.haomeiwen.com/subject/tokyzhtx.html