1.1. 如何处理响应消息? 0;11:00 ~ 0;30:00
上节内容NetworkClient.poll 方法
response.request().callback().onComplete(response);
Sender#completeBatch方法里面:
//TODO 核心代码 把异常的信息也给带过去了
//我们刚刚看的就是这儿的代码
//里面调用了用户传进来的回调函数
//回调函数调用了以后
//说明我们的一个完整的消息的发送流程就结束了。
batch.done(baseOffset, timestamp, exception);
——》RecordBatch#done
1. 2. 消息发送完了以后内存如何处理?
this.accumulator.deallocate(batch); // 回收
——》free.deallocate
1.3. 消息异常如何处理? 到 ~ 0;34:00
1.4. 如何处理超时的批次? 0;34:00 ~ 0;45:00
Sender#run 开始
Sender#completeBatch方法
——》RecordBatch#done
——》 DemoCallBack#onCompletion方法
RecordAccumulator #abortExpiredBatches方法
batch#maybeExpire
1. 5. 如何处理长时间没有收到响应的消息 0;49:00~ 1:02:00
NetworkClient.poll 方法里面
——》
//TODO 处理长时间没有接受到响应
handleTimedOutRequests(responses, updatedNow);
——》processDisconnection
//对这些请求进行处理
//大家会看到一个比较有意思的事
//自己封装了一个响应。这个响应里面没有服务端响应消息(服务端没给响应)
//失去连接的状态表标识为true
responses.add(new ClientResponse(request, now, true, null));
1. 6. 客户端源码精华总结 1:02:00 ~ 1:26:00
关键方法代码:
KafkaProducer#doSend
this.sender.wakeup(); //唤醒sender线程
↓↓
Sender#run
1、核心流程封装在一个方法里面 doSend
2、自定义异常,提示清晰
3、底层无需处理异常,直接往上抛,核心逻辑处理
4、面向对象的思想
5、自己设计高性能的数据结构,线程安全,读多写少,
6、高并发情况下,为了性能,线程安全,缩小锁的粒度,分段加锁
7、为了减少gc,使用内存池设计
8、客户端发送消息,支持异步,异步化设计(回调函数)
9、kafka的网络设计,一个客户端管多个网络
10、批处理设计(吞吐量上升)
11、多个同一broker请求合并一起
12、响应处理,考虑全面
13、支持序列化(自定义序列化格式),支持配置压缩
14、粘包、拆包的处理(思路奇妙,代码经典)
2 . 观察Kafka源码的包(服务端代码) 1:43:00
超高并发的网络架构.pngcore包
Kafka.scala 类 main 方法
——》 kafkaServerStartable.startup
——》KafkaServerStartable#starup 方法
——》KafkaServer类 startup()
——》SocketServer#startup()
//核心的线程
//在Acceptor类的主构造函数里面,启动了3个Processor线程
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize,
brokerId, processors.slice(processorBeginIndex, processorEndIndex),
connectionQuotas)
2.1 Acceptor线程是如何启动的? 1:53:00 ~
SocketServer#startup()
——》Acceptor类 run()
- Processor线程是如何启动的? 2:00:00 ~ 2:30:00 ~
Acceptor类 run()
Acceptor类 accept()
Processor 类 run 方法
- Processor线程是如何接收请求的?
2:26:00 ~
Processor 类 run 方法
->Processor#poll
-> Selector#poll
while (isRunning) {
try {
// setup any new connections that have been queued up
//读取每个SocketChannel,把每个SocketChannel
//都往Selector上面注册OP_READ事件。
configureNewConnections()
// register any new responses for writing
//TODO 看起来像是处理响应的。绑定 OP_WRITE
processNewResponses()
//我们大胆的猜测,根据我们之前的了解
//读取和发送请求的代码应该都是在这个方法里面完成的。
//TODO 再次进去
poll()
//TODO 用来处理接收到当的请求
processCompletedReceives()
//todo 处理我们已经发送出去的响应
processCompletedSends()
processDisconnected()
} catch {
- Processor线程是如何处理StateReceiver的请求的?
2:37:00 ~
<kafka精讲40讲> 服务器调优
<图解kafka70讲>
网友评论