美文网首页
kafka客户端发送批量消息流程分析

kafka客户端发送批量消息流程分析

作者: 飞奔的白牛 | 来源:发表于2020-10-30 23:33 被阅读0次

1、根据分区申请内存块

buffer = free.allocate(size, maxTimeToBlock);

2、构造batch对象加入Dequeue

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);

ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);

3、将key、value记录到buffer中

RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);

if (result.batchIsFull || result.newBatchCreated) {

log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);

  this.sender.wakeup();  //满足条件后唤醒线程发送batch

}

4、sender线程发送批次

sendProduceRequests(batches, now);

MemoryRecords records = batch.records();  //此处将buffer中的record读取出来

client.send(clientRequest, now);  

相关文章

网友评论

      本文标题:kafka客户端发送批量消息流程分析

      本文链接:https://www.haomeiwen.com/subject/zroryktx.html