美文网首页kafka
kafka图解源码-2.0 服务端代码设计1-网络

kafka图解源码-2.0 服务端代码设计1-网络

作者: fat32jin | 来源:发表于2021-02-26 06:07 被阅读0次

    31 )服务端代码设计-观察-kafka源码的包 31} 0:7

    重点代码包:

    broker:
    D:\Git_SRC\JavaProjects\kafka-0.10.1.0-nx-src\core\src\main\scala\kafka\controller

    消费者:
    D:\Git_SRC\JavaProjects\kafka-0.10.1.0-nx-src\core\src\main\scala\kafka\coordinator

    工具类:
    D:\Git_SRC\JavaProjects\kafka-0.10.1.0-nx-src\core\src\main\scala\kafka\network
    D:\Git_SRC\JavaProjects\kafka-0.10.1.0-nx-src\core\src\main\scala\kafka\tools

    32 ) 服务端代码设计-acceptor线程是如何启动的 32} 0:23

    D:\Git_SRC\JavaProjects\kafka-0.10.1.0-nx-src\core\src\main\scala\kafka\Kafka.scala

    ——》main()

    //启动服务的时候 会传递一些参数,这个地方应该就是去解析一些参数。
    val serverProps = getPropsFromArgs(args)
    val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)
    //TODO 核心代码
    kafkaServerStartable.startup
    kafkaServerStartable.awaitShutdown

    ——》KafkaServerStartable.startup()
    //TODO 启动服务
    server.startup()
    ——》KafkaServer.startup()
    //整个Kafka 服务端的功能 都是在这个里面。

     //NIO的服务端
        socketServer = new SocketServer(config, metrics, kafkaMetricsTime)
        socketServer.startup() 
    

    ——》 SocketServer.startup() ↓▲

    // 接收和发送 请求的时候一些缓存的大小
    val sendBufferSize = config.socketSendBufferBytes
    val recvBufferSize = config.socketReceiveBufferBytes
    //当前broker主机的id
    val brokerId = config.brokerId

    //核心的线程
    //在Acceptor类的主构造函数里面,启动了3个Processor线程
    val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,

    // Utils是一个工具类。里面有newThread这样一个方法
    //这个方法的作用就是用来帮我们启动线程使用的。
    //我们阅读代码的时候要知道,如果一个线程被执行start方法
    //那我们接下来就是要去看他的run方法
    Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor, false).start()
    acceptor.awaitStartup()

     ——》SocketServer.Acceptor.构造函数()
          ——》SocketServer.Acceptor.openServerSocket()
             //如果大家看不懂这儿的这几句代码
             //一定要下去补一下javaNIO方面的知识。
             val serverChannel = ServerSocketChannel.open()
           
           ——》SocketServer.Acceptor.run()
                                      ▼
    
    • 服务端启动,客户端(生产者)发送过来请求,服务端对请求进行处理,服务端给客户端发送响应。
      *     客户端接受到响应以后 -》 下一个请求的发送 
      */
      
      serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
      //服务一直就在不断的循环
      while (isRunning) {
      //selecotr 查看是否有 事件注册上来。
      val ready = nioSelector.select(500)
      //如果是客户端发送过来 要进行网络连接的请求。
      if (key.isAcceptable)
      //到这个方法里面去处理
      accept(key, processors(currentProcessor))

    33 ) 服务端代码设计-processor线程是如何启动的? 33 1 } 0:13

    入口:
    ——》SocketServer.Acceptor.run()
    //processors(线程一,线程二,线程三)
    //我们发端Acceptor线程启动起来以后
    //如果有请求发送过来,会把这些请求轮询的发送给不同的
    //Processor线程去处理。
    //processors(0) = 第一个线程处理
    //processors(1) = 第二个线程处理
    //prodcessrs(2) = 第三个线程处理
    accept(key, processors(currentProcessor))

    33 ) 服务端代码设计-processor线程是如何启动的? 33 2 } 0:7

    入口:
    ——》SocketServer.Acceptor.run()

    while (iter.hasNext && isRunning) {
    try {
    val key = iter.next
    iter.remove()
    //如果是客户端发送过来 要进行网络连接的请求。
    if (key.isAcceptable)
    //到这个方法里面去处理

                  //processors(线程一,线程二,线程三)
                  //我们发端Acceptor线程启动起来以后
                  //如果有请求发送过来,会把这些请求轮询的发送给不同的
                  //Processor线程去处理。
                  //processors(0) = 第一个线程处理
                  //processors(1) = 第二个线程处理
                  //prodcessrs(2) = 第三个线程处理
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
    
                // round robin to the next processor thread
                currentProcessor = (currentProcessor + 1) % processors.length
    

    34 ) 服务端代码设计-processor线程是如何启动的? 33 3 } 0:12

    入口:
    ——》SocketServer.Acceptor.run()
    accept(key, processors(currentProcessor))
    ——》SocketServer.accept()
    //TODO processor调用accept方法对socketChannel进行处理
    processor.accept(socketChannel)
    ——》Processor.accept()
    //把获取到SocketChannel存入到了自己的队列
    newConnections.add(socketChannel)

    0:5:18

    35) 服务端代码设计-网络-processor是如何接收请求的(1) -34}0:12

    接上回:
    ——》SocketServer.Processor.run()
    // setup any new connections that have been queued up
    //读取每个SocketChannel,把每个SocketChannel
    //都往Selector上面注册OP_READ事件。
    configureNewConnections()
    ——》SocketServer.configureNewConnections
    //TODO 注册OP_READ事件
    selector.register(connectionId, channel)
    ——》 Selector.register()
    //往自己的Selector上面注册OP_READ事件
    //这样的话,Processor线程就可以 读取客户端发送过啦id连接。
    SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
    //kafka里面对SocketChannel自己进行了封装
    //封装了一个KakaChannel
    KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
    //key和channel
    key.attach(channel);
    //所以我们服务端这儿代码 跟我们客户端的网络部分的代码是复用
    //channels里面维护了多个网络连接。
    this.channels.put(id, channel);

    36) 服务端代码设计-网络-processor是如何接收请求的(2) -34}0:7

    回到上回的:
    ——0:》SocketServer.Processor.run()
    // register any new responses for writing
    //TODO 看起来像是处理响应的。绑定 OP_WRITE
    processNewResponses()
    //我们大胆的猜测,根据我们之前的了解
    //读取和发送请求的代码应该都是在这个方法里面完成的。
    //TODO 再次进去
    poll()
    ——》SocketServer.Processor.poll()
    ——1:》Selector.poll()
    //从Selector上找到有多少个key注册了
    int readyKeys = select(timeout);
    //立马就要对这个Selector上面的key要进行处理。
    pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
    pollSelectionKeys(immediatelyConnectedKeys,
    ——1: 》Selector.pollSelectionKeys()
    //根据key找到对应的KafkaChannel
    KafkaChannel channel = channel(key);
    //去最后完成网络的连接
    //如果我们之前初始化的时候,没有完成网络连接的话,这儿一定会帮你
    //完成网络的连接。
    if (channel.finishConnect()) {
    //网络连接已经完成了以后,就把这个channel存储到
    this.connected.add(channel.id());
    //里面不断的读取数据,读取数据的代码我们之前就已经分析过
    //里面还涉及到粘包和拆包的一些问题。
    while ((networkReceive = channel.read()) != null)
    addToStagedReceives(channel, networkReceive);
    ——》Selector.addToStagedReceives()
    //channel代表的就是一个网络的连接,一台kafka的主机就对应了一个channel连接。
    if (!stagedReceives.containsKey(channel))
    stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
    //往队列里面存放接受到响应
    deque.add(receive);

        ▲ 回退到 
    

    ——1: 》Selector.poll()
    //TODO 对stagedReceives里面的数据要进行处理
    addToCompletedReceives();

    37) 服务端代码设计-网络-processor线程是如何处理stagedreceives里的请求 -35}0:10"

    接上回:
    —— 》Selector.addToCompletedReceives()方法
    //获取到每个连接对应的 请求队列
    Deque<NetworkReceive> deque = entry.getValue();
    //获取到响应
    //对于我们服务端来说,这儿接收到的是请求
    NetworkReceive networkReceive = deque.poll();
    //把响应存入到completedReceives 数据结构里面
    this.completedReceives.add(networkReceive);
    ▲ 回退到
    ——0: 》SocketServer.Processor.run()
    //TODO 用来处理接收到当的请求
    processCompletedReceives()
    ——》SocketServer.Processor.processCompletedReceives()
    //遍历每一个请求 Scala,函数式编程
    selector.completedReceives.asScala.foreach { receive =>
    //对于获取到的请求按照协议进行解析,解析出来就是一个一个Request
    val req = RequestChannel.Request(processor = id, connectionId = receive.source, session
    //TODO 把request请求放入队列
    requestChannel.sendRequest(req)
    //TODO 取消OP_READ事件
    selector.mute(receive.source)

    综上几节: 请求处理示意图

    processor线程是如何处理请求.png

    38) 服务端代码设计-网络-requestqueue里的请求是如何被处理的 -36}0:10"

    入口
    ——》KafkaServer类startup方法()
    //TODO 就是它去处理的队列里面的请求的
    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
    ——》KafkaRequestHandlerPool 构造函数

    val threads = new ArrayThread
    val runnables = new ArrayKafkaRequestHandler
    //默认启动8个线程,一般情况下,生产环境里面我们是要去设置这个参数。
    for(i <- 0 until numThreads) {
    //创建线程
    runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
    threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
    //线程启动起来了
    threads(i).start()
    ——》KafkaRequestHandler.run()
    //TODO 获取request对象
    req = requestChannel.receiveRequest(300)
    //TODO 交给KafkaApis进行最终的处理
    apis.handle(req)

    ——》KafkaApis.handle()

    39) 服务端代码设计-网络-request是如何被处理的 -37}0:23

    接上回:
    入口
    ——》 KafkaApis类 handle ()
    /因为我们用的是场景驱动的方式去分析的源码
    //我们之前一直分析的是 生产者那儿会发送过来请求
    //TODO 处理生产者发送过来的请求
    case ApiKeys.PRODUCE => handleProducerRequest(request)
    ——》 KafkaApis类 handleProducerRequest()
    //按照分区的方式去遍历数据。
    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) =
    produceRequest.partitionRecords.asScala.partition {
    //对方发送过来的数据进行一些判断
    //主要就是针对权限等等之类的事进行判断。
    case (topicPartition, _) => authorize(request.session, Describe, new

      //acks = 0
        //意味着生产者那儿不关心数据处理的结果。
        //所以我们不需要返回响应。
        if (produceRequest.acks == 0) {......        
          } else {
            requestChannel.noOperation(request.processor, request)
          }
        } else {
          //如果代码要走这儿,说明我们把数据处理完了以后
          //需要给客户端(生产者)返回响应
           //封装一个请求头
          val respHeader = new ResponseHeader(request.header.correlationId)
          //封装一个请求体(响应消息)
          val respBody = request.header.apiVersion match {
          //TODO 返回响应最重要的代码是在这儿
             //这儿给我们封装了一个Response的对象
          //这个对象就是服务端发送回给客户端(生产者)的
          requestChannel.sendResponse(new RequestChannel.Response(request, 
    
       ——》RequestChannel。sendResponse()
            //把响应存入到了一个队列里面。
           //先从数组里面取出对应Processor一个队列,然后把这个响应放入到这个队列里面。 
          responseQueues(response.processor).put(response)   
    

    40) 服务端代码设计-网络-服务端给客户端发送响应做哪些准备工作(1) -38}0:12

    接上回:
    ▲ 回退到
    ——0: 》SocketServer.Processor.run()
    //TODO 看起来像是处理响应的。绑定 OP_WRITE
    processNewResponses()
    —— 》 SocketServer.processNewResponses()
    selector.unmute(curr.request.connectionId)
    //如果我们是想发送请求,那么代码应该走的是这个分支。
    case RequestChannel.SendAction =>
    //TODO 发送请求
    sendResponse(curr)
    ——》SocketServer.Processor. sendResponse(curr)
    //正常情况走的是这个分支
    selector.send(response.responseSend)

    41) 服务端代码设计-网络-服务端给客户端发送响应做哪些准备工作(2) -38}0:7

    接上回:
    再次回到
    ——0: 》SocketServer.Processor.run()
    //TODO 看起来像是处理响应的。绑定 OP_WRITE
    processNewResponses()
    —— 》 SocketServer.processNewResponses()
    selector.unmute(curr.request.connectionId)
    //如果我们是想发送请求,那么代码应该走的是这个分支。
    case RequestChannel.SendAction =>
    //TODO 发送请求
    sendResponse(curr)
    ——》SocketServer.Processor. sendResponse(curr)
    //正常情况走的是这个else分支
    selector.send(response.responseSend)
    ——》selector.send()
    //TODO
    channel.setSend(send);
    ——》 KafakChannel.setSend( )
    //往KafkaChannel里面绑定一个发送出去的请求。
    this.send = send;
    //关键的代码来了
    //这儿绑定了一个OP_WRITE事件。
    //一旦绑定了这个事件以后,我们就可以往服务端发送请求了。
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);

    42) 服务端代码设计-网络-响应消息是如何发送给客户端的 -39}0:7

    接上回:
    再次回到
    ——0: 》SocketServer.Processor.run()
    //TODO 再次进去
    poll()
    ——》SocketServer.Processor.poll()
    ——》Selector .poll()
    ——》Selector .pollSelectionKeys()
    //已经完成响应消息的发送
    if (send != null) {
    this.completedSends.add(send);
    // this.completedSends. 是Selector 类的成员变量,代表已经完成发送的请求
    ▲ 回退到 ——0: 》SocketServer.Processor.run()
    //todo 处理我们已经发送出去的响应
    processCompletedSends()
    —— 》SocketServer. processCompletedSends()
    selector.unmute(send.destination)
    —— 》Selector.unmute()
    ——》KafkaChannel..unmute()
    //重新监听 OP_READ
    transportLayer.addInterestOps(SelectionKey.OP_READ);

    43) 服务端代码设计-网络-支持超过并发的kafka网络设计 -40}0:13

    三层网络架构,多层缓冲

    51 服务端发送响应会客户端.png

    相关文章

      网友评论

        本文标题:kafka图解源码-2.0 服务端代码设计1-网络

        本文链接:https://www.haomeiwen.com/subject/llfcxltx.html