美文网首页kafka
kafka架构师4-图解kafka源码2

kafka架构师4-图解kafka源码2

作者: fat32jin | 来源:发表于2020-08-26 23:17 被阅读0次

    1.1. 如何处理响应消息? 0;11:00 ~ 0;30:00

    上节内容NetworkClient.poll 方法
    response.request().callback().onComplete(response);

    Sender#completeBatch方法里面:

             //TODO 核心代码 把异常的信息也给带过去了
             //我们刚刚看的就是这儿的代码
            //里面调用了用户传进来的回调函数
            //回调函数调用了以后
            //说明我们的一个完整的消息的发送流程就结束了。
            batch.done(baseOffset, timestamp, exception);
            ——》RecordBatch#done
    

    1. 2. 消息发送完了以后内存如何处理?

            this.accumulator.deallocate(batch); // 回收
           ——》free.deallocate
    

    1.3. 消息异常如何处理? 到 ~ 0;34:00

    1.4. 如何处理超时的批次? 0;34:00 ~ 0;45:00

    Sender#run 开始
    Sender#completeBatch方法
    ——》RecordBatch#done
    ——》 DemoCallBack#onCompletion方法
    RecordAccumulator #abortExpiredBatches方法

    batch#maybeExpire

    1. 5. 如何处理长时间没有收到响应的消息 0;49:00~ 1:02:00

    NetworkClient.poll 方法里面
    ——》
    //TODO 处理长时间没有接受到响应
    handleTimedOutRequests(responses, updatedNow);
    ——》processDisconnection
    //对这些请求进行处理
    //大家会看到一个比较有意思的事
    //自己封装了一个响应。这个响应里面没有服务端响应消息(服务端没给响应)
    //失去连接的状态表标识为true
    responses.add(new ClientResponse(request, now, true, null));

    image.png

    1. 6. 客户端源码精华总结 1:02:00 ~ 1:26:00

    关键方法代码:
    KafkaProducer#doSend
    this.sender.wakeup(); //唤醒sender线程
    ↓↓
    Sender#run
    1、核心流程封装在一个方法里面 doSend
    2、自定义异常,提示清晰
    3、底层无需处理异常,直接往上抛,核心逻辑处理
    4、面向对象的思想
    5、自己设计高性能的数据结构,线程安全,读多写少,
    6、高并发情况下,为了性能,线程安全,缩小锁的粒度,分段加锁
    7、为了减少gc,使用内存池设计
    8、客户端发送消息,支持异步,异步化设计(回调函数)
    9、kafka的网络设计,一个客户端管多个网络
    10、批处理设计(吞吐量上升)
    11、多个同一broker请求合并一起
    12、响应处理,考虑全面
    13、支持序列化(自定义序列化格式),支持配置压缩
    14、粘包、拆包的处理(思路奇妙,代码经典)

    2 . 观察Kafka源码的包(服务端代码) 1:43:00

    超高并发的网络架构.png

    core包
    Kafka.scala 类 main 方法
    ——》 kafkaServerStartable.startup
    ——》KafkaServerStartable#starup 方法
    ——》KafkaServer类 startup()
    ——》SocketServer#startup()
    //核心的线程
    //在Acceptor类的主构造函数里面,启动了3个Processor线程
    val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize,
    brokerId, processors.slice(processorBeginIndex, processorEndIndex),
    connectionQuotas)

    2.1 Acceptor线程是如何启动的? 1:53:00 ~
    SocketServer#startup()
    ——》Acceptor类 run()

    1. Processor线程是如何启动的? 2:00:00 ~ 2:30:00 ~
      Acceptor类 run()
      Acceptor类 accept()

    Processor 类 run 方法

    1. Processor线程是如何接收请求的?
      2:26:00 ~
      Processor 类 run 方法
      ->Processor#poll
      -> Selector#poll

    while (isRunning) {
    try {
    // setup any new connections that have been queued up
    //读取每个SocketChannel,把每个SocketChannel
    //都往Selector上面注册OP_READ事件。
    configureNewConnections()
    // register any new responses for writing
    //TODO 看起来像是处理响应的。绑定 OP_WRITE
    processNewResponses()
    //我们大胆的猜测,根据我们之前的了解
    //读取和发送请求的代码应该都是在这个方法里面完成的。
    //TODO 再次进去
    poll()
    //TODO 用来处理接收到当的请求
    processCompletedReceives()
    //todo 处理我们已经发送出去的响应
    processCompletedSends()
    processDisconnected()
    } catch {

    1. Processor线程是如何处理StateReceiver的请求的?
      2:37:00 ~
      <kafka精讲40讲> 服务器调优
      <图解kafka70讲>

    相关文章

      网友评论

        本文标题:kafka架构师4-图解kafka源码2

        本文链接:https://www.haomeiwen.com/subject/cqhprktx.html