1. 掌握内存池设计 0:20:00 ~ 0:47:44
目的: 减少fullgc 概率
原理图:
5. 内存池.pngSender#produceRequest
onComplete 回调方法里面
—》Sender#completeBatch
—》RecordAccumulator#deallocate
—》BufferPool#deallocate()方法
2. 掌握生产者消息发送流程 0:48:00~ 1:00:00
2.1 一个batch何时可以发? 1:00:00~1:20:00
Sender#run方法
RecordAccumulator#ready
2.3 筛选可以发请求的broker 1:31:00~1:50:50
2. 筛选可以发送消息的broker.png3. 掌握Kafka网络设计 1:50:50~2:30:00
一个客户端维护多个链接
3. Kafka的网络设计.png终于可以发送消息了,selector.poll()方法
4. 终于发送网络请求了!.pngSender#run()
——》this.client.poll()
——》实现类 NetworkClient#pool
this.selector.poll() 执行网络IO的操作。 NIO
——》 实现类 Selector.poll()
——》pollSelectionKeys //立马就要对这个Selector上面的key要进行处理
//TODO 核心的代码来了
//去最后完成网络的连接
//如果我们之前初始化的时候,没有完成网络连接的话,这儿一定会帮你
//完成网络的连接。
if (channel.finishConnect())
//里面不断的读取数据,读取数据的代码我们之前就已经分析过
//里面还涉及到粘包和拆包的一些问题。
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
4 . 掌握生产者处理响应消息的方法-粘包/拆包技巧 2:31:00~ 3:00:00
接上 Selector#pollSelectionKeys 方法里面
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
//接受服务端发送回来的响应(请求)
//networkReceive 代表的就是一个服务端发送
//回来的响应
//里面不断的读取数据,读取数据的代码我们之前就已经分析过
//里面还涉及到粘包和拆包的一些问题。
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
方法 Kafkachannel.read()
——》Kafkachannel#readFromReadableChannel 分包逻辑
回退到selector.poll() 方法
addToCompletedReceives
——》 this.completedReceives.add
网友评论