美文网首页
Kafka2.0服务端启动源码

Kafka2.0服务端启动源码

作者: O_Neal | 来源:发表于2019-07-18 00:12 被阅读0次

      Kafka 服务端通过Kafka.scala的主函数main方法启动。KafkaServerStartable类提供读取配置文件、启动/停止服务的方法。而启动/停止服务最终调用的是KafkaServerstartup/shutdown方法。

    启动流程

    1. 启动 zk 客户端。
    2. 启动动态配置。
    3. 启动调度线程池。
    4. 启动日志管理器的后台线程,包括日志清理、日志刷盘、日志删除、日志压缩。
    5. 启动 NIO Socket 服务
      1. 初始化一个接收器Acceptor,即启动 NIO Socket。
      2. 添加num.network.threads个接收器到请求通道RequestChannel的处理器缓存ConcurrentHashMap,key 为递增编号,value 为处理器Processor
      3. Acceptor执行CountDownLatch.await等待通知启动。
      4. 缓存AcceptorConcurrentHashMap,key 为EndPoint,value 为Acceptor
    6. 启动副本管理器。
    7. 在 zk 注册 broker。
    8. 启动控制器。
    9. 启动组协调器。
    10. 启动事务协调器。
    11. 初始化KafkaApis
    12. 初始化处理器线程缓存池
      1. 启动num.io.threads个请求处理器线程KafkaRequestHandler
      2. 从阻塞队列ArrayBlockingQueue获取请求,调用KafkaApis.handle方法,进行集中处理请求。
    13. 启动处理器线程
      1. 首先CountDownLatch.countDown通知唤醒Acceptor线程。
        1. 使用NIO.select轮询。
        2. 如果有可接收就绪的事件,则将当前的SocketChannel加入缓存队列ConcurrentLinkedQueue
      2. 从上述缓存队列取出SocketChannel,绑定到KafkaChannel
      3. 将接收到的请求缓存到限长阻塞队列ArrayBlockingQueue

    请求处理流程

    服务端请求处理流程

    详细源码分析

    Acceptor 线程

    def run() {
      serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 注册接收事件
      startupComplete() // 通知 Acceptor 线程
      var currentProcessor = 0
      while (isRunning) {
        val ready = nioSelector.select(500) // 轮询事件
        if (ready > 0) {
          val keys = nioSelector.selectedKeys()
          val iter = keys.iterator()
          while (iter.hasNext && isRunning) {
            val key = iter.next
            iter.remove()
            if (key.isAcceptable) { // 有可接受事件
              val processor = synchronized {
                currentProcessor = currentProcessor % processors.size
                processors(currentProcessor) // 缓存 Processor 
              }
              accept(key, processor) // 将 SocketChannel 缓存到队列
            }
          }
        }
      }
    }
    

    Processor 线程

    override def run() {
      startupComplete() // CountDownLatch.countDown 唤醒 Acceptor 线程。
      while (isRunning) {
        configureNewConnections() // 从缓存队列取出 SocketChannel,绑定到 KafkaChannel
        processNewResponses() // 处理返回客户端的响应
        poll() // Kafka.Selector 轮询读取/写入事件
        processCompletedReceives() // 处理客户端的请求,放到阻塞队列
        processCompletedSends() // 处理返回客户端响应后的回调
        processDisconnected() // 断开连接后的处理
      }
    }
    

    KafkaRequestHandler 线程阻塞队列

    def run() {
      while (!stopped) {
        val startSelectTime = time.nanoseconds
        // 从阻塞队列拉取请求
        val req = requestChannel.receiveRequest(300) 
    
        req match {
          case request: RequestChannel.Request =>
            try {
              apis.handle(request) // 调用`KafkaApis.handle`方法,进行集中处理请求。
            }
        }
      }
    }
    

    KSelector

      参考客户端源码分析。

    相关文章

      网友评论

          本文标题:Kafka2.0服务端启动源码

          本文链接:https://www.haomeiwen.com/subject/jyjhlctx.html