美文网首页
06、Kafka 如何实现NIO网络通信?

06、Kafka 如何实现NIO网络通信?

作者: 技术灭霸 | 来源:发表于2020-09-15 09:26 被阅读0次

完整的网络通信架构


Kafka网络通信组件最核心两个部分:SocketServer和KafkaRequestHanderPool

SocketServer 组件是核心,主要实现了 Reactor 模式,用于处理外部多个 Clients(这里 的 Clients 指的是广义的 Clients,可能包含 Producer、Consumer 或其他 Broker)的并 发请求,并负责将处理结果封装进 Response 中,返还给 Clients。

KafkaRequestHandlerPool 组件就是我们常说的 I/O 线程池,里面定义了若干个 I/O 线 程,用于执行真实的请求处理逻辑。

SocketServer

SocketServer 是 Kafka 网络通信层中最重要的子模块。它下辖的 Acceptor 线程、Processor 线程和 RequestChannel 等对象,都是实施网络通信的重要组成部分。

  1. AbstractServerThread 类:这是 Acceptor 线程和 Processor 线程的抽象基类,定义 了这两个线程的公有方法,如 shutdown(关闭线程)等。
  2. Acceptor 线程类:这是接收和创建外部 TCP 连接的线程。每个 SocketServer 实例只会 创建一个 Acceptor 线程。它的唯一目的就是创建连接,并将接收到的 Request 传递给下 游的 Processor 线程处理。
  3. Processor 线程类:这是处理单个 TCP 连接上所有请求的线程。每个 SocketServer 实 例默认创建若干个(num.network.threads)Processor 线程。Processor 线程负责将接 收到的 Request 添加到 RequestChannel 的 Request 队列上,同时还负责将 Response 返还给 Request 发送方
  4. Processor 伴生对象类:仅仅定义了一些与 Processor 线程相关的常见监控指标和常量 等,如 Processor 线程空闲率等。
  5. ConnectionQuotas 类:是控制连接数配额的类。我们能够设置单个 IP 创建 Broker 连 接的最大数量,以及单个 Broker 能够允许的最大连接数。
  6. TooManyConnectionsException 类:SocketServer 定义的一个异常类,用于标识连 接数配额超限情况。
  7. SocketServer 类:实现了对以上所有组件的管理和操作,如创建和关闭 Acceptor、 Processor 线程等。

Acceptor 线程

经典的 Reactor 模式有个 Dispatcher 的角色,接收外部请求并分发给下面的实际处理线
程。在 Kafka 中,这个 Dispatcher 就是 Acceptor 线程。

                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              connectionQuotas: ConnectionQuotas,
                              metricPrefix: String) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 // 创建底层的NIO Selector对象
 // Selector对象负责执行底层实际I/O操作,如监听连接创建请求、读写请求等
  private val nioSelector = NSelector.open()
 // Broker端创建对应的ServerSocketChannel实例
  // 后续把该Channel向上一步的Selector对象注册
  val serverChannel = openServerSocket(endPoint.host, endPoint.port)
// 创建Processor线程池,实际上是Processor线程数组
  private val processors = new ArrayBuffer[Processor]()
  private val processorsStarted = new AtomicBoolean
  private val blockedPercentMeter = newMeter(s"${metricPrefix}AcceptorBlockedPercent",
    "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> endPoint.listenerName.value))
}

Acceptor 代码中,提供了 3 个与 Processor 相关的方法,分别是 addProcessors、startProcessors 和 removeProcessors。

有了这三个方法,Acceptor 类就具备了基本的 Processor 线程池管理功能。不过,Acceptor 类逻辑的重头戏其实是 run 方法,它是处 理 Reactor 模式中分发逻辑的主要实现方法。

 def run(): Unit = {
   //注册OP_ACCEPT事件
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
   // 等待Acceptor线程启动完成
    startupComplete()
    try {
   // 当前使用的Processor序号,从0开始,最大值是num.network.threads - 1
      var currentProcessorIndex = 0
      while (isRunning) {
        try {
// 每500毫秒获取一次就绪I/O事件
          val ready = nioSelector.select(500)
          if (ready > 0) {// 如果有I/O事件准备就绪
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()

                if (key.isAcceptable) {
            // 调用accept方法创建Socket连接
                  accept(key).foreach { socketChannel =>
                    // 
                    var retriesLeft = synchronized(processors.length)
                    var processor: Processor = null
                    do {
                      retriesLeft -= 1
                   // 指定由哪个Processor线程进行处理
                      processor = synchronized {
                        currentProcessorIndex = currentProcessorIndex % processors.length
                        processors(currentProcessorIndex)
                      }
                   // 更新Processor线程序号
                      currentProcessorIndex += 1
                    } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
                  }
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
        catch {
          case e: ControlThrowable => throw e
          case e: Throwable => error("Error occurred", e)
        }
      }
    } finally { // 执行各种资源关闭逻辑
      debug("Closing server socket and selector.")
      CoreUtils.swallow(serverChannel.close(), this, Level.ERROR)
      CoreUtils.swallow(nioSelector.close(), this, Level.ERROR)
      shutdownComplete()
    }
  }

基本上,Acceptor 线程使用 Java NIO 的 Selector + SocketChannel 的方式循环地轮询 准备就绪的 I/O 事件。这里的 I/O 事件,主要是指网络连接创建事件,即代码中的 SelectionKey.OP_ACCEPT。一旦接收到外部连接请求,Acceptor 就会指定一个 Processor 线程,并将该请求交由它,让它创建真正的网络连接。总的来说,Acceptor 线 程就做这么点事。

Processor 线程

如果说 Acceptor 是做入站连接处理的,那么,Processor 代码则是真正创建连接以及分 发请求的地方。显然,它要做的事情远比 Acceptor 要多得多。我先给出 Processor 线程 的 run 方法,你大致感受一下:

override def run(): Unit = {
    startupComplete()// 等待Processor线程启动完成
    try {
      while (isRunning) {
        try {
          configureNewConnections()// 创建新连接
          processNewResponses() 发送Response,并将Response放入到inflightResponse
          poll()// 执行NIO poll,获取对应SocketChannel上准备就绪的I/O操作
          processCompletedReceives()// 将接收到的Request放入Request队列
          processCompletedSends()// 为临时Response队列中的Response执行回调逻辑
          processDisconnected() // 处理因发送失败而导致的连接断开
          closeExcessConnections() // 关闭超过配额限制部分的连接
        } catch {
          case e: Throwable => processException("Processor got uncaught exception.", e)
        }
      }
    } finally { // 关闭底层资源
      debug(s"Closing selector - processor $id")
      CoreUtils.swallow(closeAll(), this, Level.ERROR)
      shutdownComplete()
    }
  }



每个 Processor 线程在创建时都会创建 3 个队列。注意,这里的队列是广义的队列,其底 层使用的数据结构可能是阻塞队列,也可能是一个 Map 对象而已,如下所示:

  private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize)
  private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
  private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

队列一:newConnections

它保存的是要创建的新连接信息,具体来说,就是 SocketChannel 对象。这是一个默认上 限是 20 的队列,而且,目前代码中硬编码了队列的长度,因此,你无法变更这个队列的长 度。

每当 Processor 线程接收新的连接请求时,都会将对应的 SocketChannel 放入这个队列。 后面在创建连接时(也就是调用 configureNewConnections 时),就从该队列中取出 SocketChannel,然后注册新的连接。

队列二:inflightResponses

严格来说,这是一个临时 Response 队列。当 Processor 线程将 Response 返还给 Request 发送方之后,还要将 Response 放入这个临时队列。

为什么需要这个临时队列呢?这是因为,有些 Response 回调逻辑要在 Response 被发送 回发送方之后,才能执行,因此需要暂存在一个临时队列里面。这就是 inflightResponses 存在的意义。

队列三:responseQueue

看名字我们就可以知道,这是 Response 队列,而不是 Request 队列。这告诉了我们一个 事实:每个 Processor 线程都会维护自己的 Response 队列,而不是像网上的某些文章说 的,Response 队列是线程共享的或是保存在 RequestChannel 中的。Response 队列里 面保存着需要被返还给发送方的所有 Response 对象。

Processor 从底层 Socket 通道不断读取已接收到的网络请求,然 后转换成 Request 实例,并将其放入到 Request 队列。

总结

相关文章

网友评论

      本文标题:06、Kafka 如何实现NIO网络通信?

      本文链接:https://www.haomeiwen.com/subject/dbqkcktx.html