美文网首页
kafka之网络模型总结

kafka之网络模型总结

作者: 外星人rsz | 来源:发表于2019-06-28 22:38 被阅读0次

    弄清楚kafka的网络模型原理,能很好的帮助理解和优化kafka服务。kafka底层的网络通信,没有使用第三方rpc实现,如netty等,而是使用了java的NIO实现的一套自己的通信框架协议。本文主要描述kafka基础网络通信的相关实现原理,版本为1.1.0。

    java NIO具体细节不再描述,主要包含3个部分:

    • Channel:连接,如FileChannel、SocketChannel等,表示连接通道,阻塞和非阻塞(FileChannel不支持)模式
    • Buffer:缓存,可在读写两种模式中切换
    • Selector:选择器,可实现一个线程处理多个连接

    kafka实现网络模型主要涉及到3个类:

    • SocketServer:实现监听listeners,并构建AcceptorProcessorConnectionQuotas等类,用于接收、处理、解析request和response。此处重要监控指标:NetworkProcessorAvgIdlePercentRequestQueueSizeResponseQueueSize
    • KafkaApis:负责处理broker支持的各种通信协议,如PRODUCE/FETCH/LIST_OFFSETS/LEADER_AND_ISR/HEARTBEAT
    • KafkaRequestHandlerPool:负责接收消息,处理SocketServer接收的请求,并构建response返回给SocketServer。此处重要监控指标:RequestHandlerAvgIdlePercent

    通信概况

    kafka网络模型_精简

    kafka接收消息、处理并返回,主要有以下步骤:

    1. Acceptor:监听OP_ACCEPT,将连接的channel以round-robin的方式选择processor进行处理

    2. Processor:监听连接的OP_READOP_WRITE,主要负责读写和解析数据

      • 读取客户端消息解析后放入requestQueue,解析头部信息
      • responseQueue中的消息发送回客户端
    3. KafkaRequestHandler:从requestQueue中获取连接,根据头部信息获取对应的KafkaApi协议进行相关处理,并通过回调,将处理后的结果通过RequestChannel写到对应ProcessorresponseQueue中,等待Processor线程处理

    通信处理详细步骤

    kafka网络模型

    如图所示,其中,蓝色线表示请求处理及流向,绿色组件表示消息缓存。

    1. SocketServer中,主要功能有:

      • 通过配置的listeners可以监听多个interface/port,构建对应的Acceptor
      • 每个Acceptor构建num.network.threadsProcessor,用于处理连接请求
      • 构建RequestChannel,保存解析好的连接请求
      # 主要逻辑
      private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
          ...
          for (_ <- 0 until newProcessorsPerListener) {
            val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
            listenerProcessors += processor
            requestChannel.addProcessor(processor)
            nextProcessorId += 1
          }
          listenerProcessors.foreach(p => processors.put(p.id, p))
          acceptor.addProcessors(listenerProcessors)
        }
      
    2. Acceptor为线程,主要功能有:

      • 监听OP_ACCEPT事件,不断循环获取已经accept的连接
      • 判断每个ip的连接数是否超过quota限制
      • 通过round-robin的方式,选择Processor,放入Processor对应的newConnections缓存中
      • 代码逻辑主要看run()方法
    3. Processor为线程,主要功能为缓存新建连接、接收并缓存数据、缓存返回信息、处理断开连接等

      • configureNewConnections(): 处理新建的连接,监听OP_READ事件,等待读取数据
      • poll()
        • 真正读取数据,并放入接收缓存队列stagedReceives,缓存所有channel的请求
        • 拿出每个channel的第一个请求,解析协议头部,放入completedReceives缓存中
        • 如果channel写出ready,则进行write,将response返回给客户端
      • processCompletedReceives():将请求解析为Request并放入requestQueue缓存
      # 主要代码逻辑
      override def run() {
          startupComplete()
          try {
            while (isRunning) {
              try {
                // setup any new connections that have been queued up
                configureNewConnections()
                // register any new responses for writing
                processNewResponses()
                poll()
                processCompletedReceives()
                processCompletedSends()
                processDisconnected()
                ...
      
    4. KafkaRequestHandlerPool初始化num.io.threadsKafkaRequestHandler

      • requestQueue中拿去request,并根据协议头,选择对应的KafkaApi进行处理
      • 使用回调,将处理完成的response通过KafkaChannel放入当前channel对应的ProcessorresponseQueue

    由此可见,kafka内部通信使用了NIO+缓存+异步,从而极大提升了kafka的单机并发能力。

    至此,整个网络处理大概步骤完成,其中很多细节内容,这里不再赘述,感兴趣的同学可以看源码或一起交流。

    通信协议

    kafka网络模型_协议规范

    kafka接收到的数据内容格式如上图所示:

    • 4B: 整个数据的长度,最大为2^31 - 1,超过socket.request.max.bytes则报错,默认是100M
    • Header
      • 2B: api_key
      • 2B: api_version,api_key+api_version,可以决定当前消息的协议,如Producer、Fetcher、LeaderAndISR等
      • 4B: correlation_id,返回给客户端时携带
      • clientId: string
        • 2B: string长度
        • remain: string具体内容
    • Body(Payload),根据不同的协议使用不同的解析方法,此处以Produce协议为例
      • 2B: acks,写入的ack值,-1、0、1等,分别表示写入的可靠性要求
      • 4B: timeout,表示此Produce请求的超时时间
      • topic_data[]: Array
        • 4B: array长度
        • topic_data
          • topic_name: string
            • 2B: string长度
            • remain: string具体内容
          • partition_data[]: Array
            • 4B: array长度
            • partition_data
              • 4B: partition_id
              • 4B: 消息体长度
              • remain: MemoryRecords

    KafkaApi支持各种协议,具体可参见源码,这里只大概描述Produce协议的主要内容。基于此,kafka各版本之间可以保证很好的协议兼容性。

    优化

    基于以上分析,我们通过分析相关监控指标,进行了相关调优工作,主要调优内容为(仅供参考):

    zookeeper.session.timeout.ms=30000
    num.network.threads=12
    num.io.threads=16
    num.replica.fetchers=3
    replica.fetch.max.bytes=2097152
    replica.fetch.response.max.bytes=20971520
    

    调整后,cpu的负载明显提升,更加有效的利用机器资源,processor和handler的idle比例明显提高,isr expand/shrink出现频次和梳理也明显降低,大大提升的集群的稳定性。

    结尾

    本文主要介绍kafka内部的通信模型,kafka组件还有很多模块,后续会不断深入学习和理解,欢迎大家一起交流和学习。最近Pulsar也有兴起之势,后续也需要学习,真是学无止境啊~~

    参考

    matt博客

    相关文章

      网友评论

          本文标题:kafka之网络模型总结

          本文链接:https://www.haomeiwen.com/subject/rouycctx.html